Vulnerability History
| Date | High Risk | Low Risk | 
|---|---|---|
| 2024-12-16 | 3 | 0 | 
Audit Report Details
5998
      Lines of Code
    7
      Open
    0
      Resolved
    π¨ High Risk Vulnerabilities
β οΈ Low Risk Vulnerabilities
Vulnerable Code:
1---
2File: /docs/mechanism.md
3---
4
5
6# DEPRECATED
7## π Score Calculation Overview
8
9This section provides an overview of how challenge items are created for training a token compressor for large language models (LLMs), the purpose of each task, and how scores are calculated. The goal is to compress long contexts into shorter tokens while ensuring the integrity and usefulness of the output.
10
111. [Challenge Item Creation](#challenge-item-creation)
12    - [Datasets Used](#datasets-used)
13    - [Challenge Generation Process](#challenge-generation-process)
142. [Purpose of Each Task](#purpose-of-each-task)
15    - [Question Answering Task](#question-answering-task)
16    - [Continual Conversation Task](#continual-conversation-task)
17    - [Reconstruction Task](#reconstruction-task)
183. [Score Calculation](#score-calculation)
19    - [Criteria Used](#criteria-used)
20    - [Scoring Methods](#scoring-methods)
21    - [Rating Groups](#rating-groups)
22    - [Tier System](#tier-system)
23
24Firstly, I want to credit [In-context Autoencoder](https://github.com/getao/icae) for the inspiration and the paper of token compressing.
25
26<div align="center">
27<img src="../assets/images/icae.png" alt="icae-method" width="100%">
28</div>
29
30### Challenge Item Creation for Synthetic
31
32**Preview:** https://huggingface.co/datasets/Condense-AI/synthetic-samples-v0.1
33
34<div align="center">
35<img src="https://github.com/user-attachments/assets/0734407e-f967-4b67-9a49-1da7c2c752f6" alt="preview-synth-dataset" width="75%">
36</div>
37
38This dataset is generated by challenge generating in the subnet.
39To reproduce it, please run:
40```bash
41git clone https://github.com/condenses/neural-condense-subnet
42pip install -e .
43cd neural_condense_subnet
44python tests/test_synthetic.py
45```
46
47Console Output:
48```
49Benchmark Summary:
50Error count: 0
51Error rate: 0.00%
52Average processing times (seconds): {'question_answering': 0.024009417057037352, 'reconstruction': 0.0036168951988220215, 'continual_conversation': 0.004922831296920776}
53
54Context length statistics:
55Mean: 10116.86 characters
56Standard Deviation: 86.42 characters
57```
58
59#### Datasets Used
60
61The challenge items are created using a combination of datasets to simulate real-world scenarios:
62
63| Dataset Type | Datasets Used |
64|--------------|---------------|
65| Context Seeds | FineWeb-Pro and other context datasets loaded via `load_context_datasets()` |
66| Conversation Seeds | Infinity Instruct, Open Math Instruct, and other instruction datasets loaded via `load_instruct_datasets()` |
67| Cached Examples | Condense-AI/subnet-synthetic-dataset-v0.2 |
68
69#### Challenge Generation Process
70
71The challenge generation system consists of three main components:
72
731. **ConvoGenerator**: Handles the generation of conversations and QA pairs using an LLM API
742. **Scheduler**: Manages queues of pre-generated challenges using Redis
753. **ChallengeGenerator**: Creates the final challenge items in the correct protocol format
76
77The process involves:
78
791. **Queue Management**:
80   - Pre-fills Redis queues with cached examples from subnet-synthetic-dataset-v0.2
81   - Continuously refreshes queues to maintain a steady supply of challenges
82   - Monitors queue sizes and generates new items when needed
83
842. **Challenge Types**:
85   - `question_answering`: QA pairs embedded within conversations
86   - `causal_conversation`: Natural flowing conversations
87   - `reconstruct_conversation`: Tests ability to reformat conversations
88   - `trivial_qa_conversation`: Fill-in-the-blank style questions
89
903. **Protocol Creation**:
91   - Inserts special tokens (`<START-ACTIVATE-TOKEN>`, `<END-ACTIVATE-TOKEN>`)
92   - Formats context, activation prompt, and expected completion
93   - Applies chat template using the provided tokenizer
94
95### Purpose of Each Task
96
97The system supports four main task types:
98
99#### Question Answering Task
100
101**Objective**: Test comprehension of context and ability to answer specific questions.
102
103- **Process**:
104  - Generates QA pairs from context using LLM
105  - Embeds QA within larger conversations
106  - Number of QA pairs controlled by `n_qa_per_context` (default: 4)
107
108#### Causal Conversation Task
109
110**Objective**: Test ability to maintain conversation flow and context.
111
112- **Process**:
113  - Uses conversation seeds from instruction datasets
114  - Generates additional turns using LLM
115  - Controls conversation length with `max_turns` parameter
116
117#### Reconstruction Task
118
119**Objective**: Test ability to reformat conversations while maintaining content.
120
121- **Process**:
122  - Takes existing conversation
123  - Requires reformatting into specific template:
124    ```
125    [Role]: [Message]
126    Example:
127    - User: Hello, how are you?
128    - Assistant: I am fine, thank you.
129    ```
130
131#### Trivial QA Task
132
133**Objective**: Test basic comprehension with fill-in-the-blank questions.
134
135- **Process**:
136  - Selects sentence from conversation
137  - Creates blank with surrounding context
138  - Expects exact completion of missing text
139### Score Calculation
140
141#### Criteria Used
142
143Scores are calculated based on perplexity and ELO ranking in batched competitions:
144
145| Criteria | Description |
146|----------|-------------|
147| Perplexity | Measures how well the model predicts the expected completion tokens, with lower values indicating better performance |
148| ELO Rating | A relative rating system that adjusts based on performance against other miners in the batch |
149
150#### Scoring Method
151
1521. **Perplexity Calculation**:
153   ```python
154   # Lower perplexity = better performance
155   perplexity = exp(-1/N * Ξ£ log P(token_i))
156   ```
157
1582. **Batch Competition**:
159   - Miners are grouped into batches of size `BATCH_SIZE` (default: 4)
160   - Each batch competes on the same challenge
161   - Performance is relative within the batch
162   - Groups are formed based on ELO ratings to ensure fair competition
163
1643. **ELO Rating Updates**:
165   ```python
166   # K-factor varies based on rating tier:
167   # - Beginner (0-800): K=24
168   # - Intermediate (800-1600): K=16
169   # - Advanced (1600-3000): K=4
170   
171   expected_score = 1 / (1 + 10^((opponent_rating - player_rating)/400))
172   new_rating = current_rating + K * (actual_score - expected_score)
173   ```
174
1754. **Accelerator Rewards**:
176   Additional rewards are calculated based on:
177   ```python
178   compress_rate = 1 - len(compressed_tokens) / max_condensed_tokens
179   process_time = 1 - process_time / timeout
180   accelerator = max(0, (compress_rate + process_time) / 2)
181   ```
182
183#### Rating Groups
184
185Miners are divided into ELO rating groups that affect their K-factor:
186
187| Group | Rating Range | K-factor |
188|-------|-------------|----------|
189| Beginner | 0-800 | 24 |
190| Intermediate | 800-1600 | 16 |
191| Advanced | 1600-3000 | 4 |
192
193#### Tier System
194
195Each tier has specific configuration and incentive percentages:
196
197| Tier | Incentive % | Max Tokens | Context Length |
198|------|-------------|------------|----------------|
199| Research | 100% | 1024 | 10,000 chars |
200| Inference 0 | 0% | 1024 | 15,000 chars |
201| Inference 1 | 0% | 2048 | 20,000 chars |
202
203### Configuration
204
205The challenge system is configured through constants:
206
207```python
208SYNTHETIC_TASK_CONFIG = [
209    {
210        "task": "causal_conversation",
211        "criterias": ["perplexity"],
212        "rewarding_frequency": 1,
213        "weight": 1,
214    },
215    // ... other tasks ...
216]
217```
218
219Each tier has specific configuration for context length and token limits:
220
221```python
222TIER_CONFIG = {
223    "research": {
224        "max_context_length_in_chars": 10000,
225        "max_condensed_tokens": 1024,
226        "min_condensed_tokens": 128,
227        // ... other settings ...
228    },
229    // ... other tiers ...
230}
231```
232
233### Summary
234
235The token compression challenge is designed to rigorously test the token compressor's effectiveness in various scenarios by:
236
237- Using diverse datasets to simulate real-world inputs.
238- Creating tasks that challenge different aspects of comprehension and context retention.
239- Calculating scores based on LM metrics (perplexity,...) and ELO rating.
240
241By understanding how the challenge items are created, the purpose behind each task, and the scoring methodology, developers can better train and evaluate token compressors for LLMs.
242
243
244
245
246---
247File: /docs/miner.md
248---
249
250<div align="center">
251
252# β‘ Miner Documentation
253
254</div>
255
256## Minimum Requirements for Baseline
257- GPU with at least 24GB of VRAM (RTX 4090, A6000, A100, H100, etc.) to run Baseline Model
258- CUDA, NVIDIA Driver installed
259- PM2 install (see [Guide to install PM2](./pm2.md))
260- Setup a cloud storage for uploading miner outputs. Here are some recommended options:
261    - `Huggingface Hub` (free but has some limitations)
262    - `AWS S3`
263    - `minio` (open-source version of AWS S3) (see [Guide to install MINIO](./minio.md))
264    - `Google Cloud Storage`
265    - `Azure Blob Storage`
266
267## What does a Miner do?
268
269A miner is a node that is responsible for condensing a long text into much shorter as condensed tokens & activations. These condensed tokens & activations are then used to feed to Large Language Models like Llama, Gemma, Mistral, etc.
270
271## How does a Miner work?
272
273We (subnet owner) provide some baselines for miners. But miners have to research their own algorithms to be more competitive. We also have a mission to push the latest SOTA algorithms to the miners as soon as possible.
274
275So basically, there are somethings that a miner has to do:
276
2771. Select a TIER: we have 2 tiers: research, universal. You can see the details in the miner's config file: `neural_condense_core/constants.py` or at the [README.md](../README.md) doc.
278
2792. Implement your own algorithm or pick one of our baseline algorithms. You can find the baseline algorithms in the `services/miner_backend/` folder.
280The schema of backend api is very simple: `Validator` sends you a dictionary with the `context: str` and `target_model: str`.
281- For `research` tier:
282Miner runs their own backend that results in KV Cache of the target LLM model. Then miner uploads the KV Cache to the `minio` storage and returns the `minio` path to the `Validator`.
283  - `past_key_values: Tuple[Tuple[torch.FloatTensor]]` is the format of the KV Cache. It would be loaded into the LLM using `torch.DynamicCache.from_legacy_cache(past_key_values)`.
284- For `universal` tier:
285Miner runs their own backend that results in compressed text representation and returns the compressed text to the `Validator` as an attribute of `SynapseResponse`.
286
287
2883. After having a competitive backend, you need to measure it to meet speed and load defined in the tier. **Our baselines are required to use GPU**.
289
2904. Register your slot and start mining.
291
292## Steps to setup a Miner
293
294### 1. Clone the repository
295```bash
296git clone https://github.com/condenses/neural-condense-subnet
297cd neural-condense-subnet
298```
299
300### 2. Install the dependencies
301```bash
302pip install uv
303uv sync --prerelease=allow
304. .venv/bin/activate
305. scripts/install_redis.sh
306```
307
308### 3. Config your wallet, backend, etc... Below just an example:
309
310**Parameters**
311- `--miner.tier` - The selected tier should be suitable with your backend.
312- `--netuid` - The network UID of the subnet.
313- `--subtensor.network` - The Subtensor network to connect to. `finney` for the mainnet. `test` for the testnet.
314- `--wallet.name` - The name of the wallet to use.
315- `--wallet.hotkey` - The hotkey of the wallet to use.
316- `--axon.port` - The port to be posted to metagraph.
317- `--miner.backend.host` - The host of the miner backend for condensing.
318- `--miner.backend.port` - The port of the miner backend for condensing.
319
320**Important**: `axon_port` must be opened in your firewall.
321
322**Define bash variable in your terminal**
323```bash
324miner_tier="research" # or "universal"
325miner_wallet="my_wallet"
326miner_hotkey="my_hotkey"
327miner_backend_host="localhost"
328miner_backend_port=8080
329miner_axon_port=12345
330miner_netuid=47
331miner_subtensor_network="finney"
332```
333
334### 4. Run the miner backend. <br>
335
336#### 4.a. Research tier: <br>
337You have to collect the `MINIO_ACCESS_KEY`, `MINIO_SECRET_KEY`, `MINIO_BUCKET`, `MINIO_SERVER` from the minio setup (see [minio.md](./minio.md)).
338
339There are three compression algorithms available:
340- `kvpress`: Basic KV-cache compression
341- `soft_token`: Soft token compression (requires additional model)
342- `activation_beacon`: Activation beacon compression
343
344```bash
345export MINIO_ACCESS_KEY="your_minio_access_key"
346export MINIO_SECRET_KEY="your_minio_secret_key"
347export MINIO_BUCKET="condense"
348export MINIO_SERVER="your_minio_server"
349
350# Choose one of the following commands based on your preferred algorithm:
351
352# For KVPress compression:
353pm2 start python --name condense_miner_backend \
354-- -m gunicorn "services.miner_backend.app:create_app('kvpress')" \
355--timeout 120 \
356--bind 0.0.0.0:$miner_backend_port
357
358# For Soft Token compression:
359pm2 start python --name condense_miner_backend \
360-- -m gunicorn "services.miner_backend.app:create_app('soft_token')" \
361--timeout 120 \
362--bind 0.0.0.0:$miner_backend_port
363
364# For Activation Beacon compression:
365pm2 start python --name condense_miner_backend \
366-- -m gunicorn "services.miner_backend.app:create_app('activation_beacon')" \
367--timeout 120 \
368--bind 0.0.0.0:$miner_backend_port
369```
370
371**Note**: 
372- If using `soft_token` algorithm, you can train your own model using our prepared trainer at [Condense-Trainer](https://github.com/condenses/condense-trainer).
373- Each algorithm has different GPU memory requirements:
374  - `kvpress`: ~24GB VRAM
375  - `soft_token`: ~24GB VRAM + additional memory for condenser model
376  - `activation_beacon`: ~24GB VRAM
377
378You can also run the backend directly without PM2 for testing:
379```bash
380python -m gunicorn "services.miner_backend.app:create_app('kvpress')" --bind 0.0.0.0:8080
381```
382#### 4.b. Universal tier:
383You can check the default `llmlingua-2` model in the `services.miner_backend.universal_app` folder, and develop your own model to further improve the performance.
384```bash
385pm2 start python --name miner_universal_backend \
386	-- -m gunicorn "services.miner_backend.universal_app:create_app('llmlingua-2')" \
387	--timeout 120 \
388	--bind 0.0.0.0:8080
389```
390
391
392### 5. Run the mining script
393```bash
394pm2 start python --name condense_miner \
395-- -m neurons.miner \
396--netuid $miner_netuid \
397--subtensor.network $miner_subtensor_network \
398--wallet.name $miner_wallet \
399--wallet.hotkey $miner_hotkey \
400--miner.tier $miner_tier \
401--miner.backend_host $miner_backend_host \
402--miner.backend_port $miner_backend_port \
403--axon.port $miner_axon_port
404```
405
406
407---
408File: /docs/minio.md
409---
410
411### Setting Up MinIO Storage for Uploading Condensed Responses
412
413#### Overview
414
415The miner will generate a URL and share it with the validator. The validator will use this URL to download the condensed response. 
416
417To facilitate this, a public storage solution is needed to ensure the validator can access the file. This guide demonstrates how to use MinIO for this purpose.
418
419For full instructions, refer to the [MinIO documentation](https://min.io/docs/minio/linux/operations/installation.html).
420
421Other storage options like AWS S3 or GCP Storage are also viable, as long as the validator can download the file from the provided URL.
422
423For a quick setup, you can use [Railway](https://railway.app/). Note, however, that it is a centralized service and may come with certain limitations. Use the provided template for deployment:
424
425[](https://railway.app/template/lRrxfF?referralCode=xpVB_C)
426
427**Important:** After setting up MinIO, youβll need the following credentials:
428- `MINIO_ACCESS_KEY`
429- `MINIO_SECRET_KEY`
430- `MINIO_BUCKET`
431- `MINIO_SERVER`
432
433These details will be essential for configuring the miner.
434
435---
436
437### Step-by-Step Setup on a New Machine
438
4391. **Install the MinIO server**  
440   Follow the setup instructions here: [Deploy MinIO on a Single Node](https://min.io/docs/minio/linux/operations/install-deploy-manage/deploy-minio-single-node-single-drive.html).  
441   Ensure the MinIO server port is exposed publicly by including `--address 0.0.0.0:$open_port`.
442
443   - If running MinIO on the same machine, set `MINIO_SERVER` to `http://localhost:<open_port>`.  
444   - If hosted on a remote machine, use `http://<remote_machine_ip>:<open_port>`.
445
4462. **Create a bucket**  
447   Name the bucket `condense_miner` and configure it to be public.
448
4493. **Generate credentials**  
450   Create the following credentials:
451   - `MINIO_ACCESS_KEY`
452   - `MINIO_SECRET_KEY`
453
4544. **Retrieve the server address**  
455   Record the `MINIO_SERVER` address to be used for miner setup.
456
457
458---
459File: /docs/pm2.md
460---
461
462# Guide to install PM2
463
464## What is PM2?
465
466PM2 is a process manager for Node.js applications. It is a simple and easy-to-use tool that allows you to keep your Node.js applications alive and running, even when they crash. PM2 also provides a built-in load balancer, so you can easily scale your applications across all available CPUs.
467
468## Installation
469
4701. Install Node Version Manager (NVM) by running the following command:
471
472```bash
473curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.0/install.sh | bash
474```
475
4762. Restart your terminal and install Node.js by running the following command:
477
478```bash
479export NVM_DIR="$HOME/.nvm"
480nvm install 22
481```
482
4833. Install PM2 by running the following command:
484
485```bash
486npm i pm2 -g
487```
488
4894. Restart your terminal and verify the installation by running the following command:
490
491```bash
492pm2 --version
493```
494
495
496---
497File: /docs/validator.md
498---
499
500<div align="center">
501
502# β‘ Validator Documentation
503
504</div>
505
506## Minimum Requirements
507- GPU with at least 80GB of VRAM (A100, H100, etc.) to run LLMs and Reward Model
508- 512GB of SSD storage
509- CUDA, NVIDIA Driver installed
510- Internet connection with at least 4Gbps
511- PM2 install (see [Guide to install PM2](./pm2.md))
512- Obtain TogetherAI API Key: https://api.together.ai/settings/api-keys
513
514## What does a Validator do?
515
516- Synthetic request & evaluate miner's performance by using prepared tasks: autoencoder, question-answering, conservation, etc.
517- Forward Organic API if you want to sell your bandwidth to the end-users.
518
519## Steps to setup a Validator
520
5211. Clone the repository
522```bash
523git clone https://github.com/condenses/neural-condense-subnet
524cd neural-condense-subnet
525```
526
5272. Install the dependencies
528```bash
529pip install uv
530uv sync --prerelease=allow
531. .venv/bin/activate
532. scripts/install_redis.sh
533```
534To test if Redis is working correctly, run `redis-cli ping` and it should return `PONG`.
535
536**Optional**
537- Login to Weights & Biases to use the logging feature
538```bash
539wandb login
540```
541
5423. Config your wallet, backend host, and port. Below just an example:
543
544**Parameters**
545- `--netuid` - The network UID of the subnet.
546- `--subtensor.network` - The Subtensor network to connect to. `finney` for the mainnet. `test` for the testnet.
547- `--wallet.name` - The name of the wallet to use.
548- `--wallet.hotkey` - The hotkey of the wallet to use.
549- `--axon.port` - The port to be posted to metagraph.
550- `--validator.score_backend.host` - The host of the validator backend for scoring.
551- `--validator.score_backend.port` - The port of the validator backend for scoring.
552- `--validator.gate_port` - The port to open for the validator to forward the request from end-users to the miner. It should be an open port in your firewall. It's optional
553- `--validator.use_wandb` - Use Weights & Biases for logging. It's optional.
554
555**Important**: `axon_port` and `gate_port` must be opened in your firewall.
556
557**Define bash variable in your terminal**
558```bash
559val_wallet="my_wallet"
560val_hotkey="my_hotkey"
561val_backend_host="localhost"
562val_backend_port=8080
563val_universal_backend_host="localhost"
564val_universal_backend_port=8090
565val_axon_port=12345
566val_gate_port=12346
567val_netuid=47
568val_subtensor_network="finney"
569```
570
5714. Run the validator backend.
572***NOTE***: You have to run **both** the research and universal tier backend.
573To run the research tier backend, run:
574```bash
575pm2 start python --name condense_validator_research_backend \
576-- -m gunicorn services.validator_backend.scoring.app:app \
577--workers 1 \
578--bind $val_backend_host:$val_backend_port \
579--timeout 0
580```
581
582To run the universal tier backend, run:
583```bash
584export TOGETHER_API_KEY=your_together_api_key
585pm2 start python --name condense_validator_universal_backend \
586-- -m gunicorn services.validator_backend.universal_scoring.app:app \
587--workers 1 -k uvicorn.workers.UvicornWorker \
588--bind $val_universal_backend_host:$val_universal_backend_port \
589--timeout 0
590```
591
5925. Run the validator script
593```bash
594export HF_HUB_ENABLE_HF_TRANSFER=1
595pm2 start python --name condense_validator \
596-- -m neurons.validator \
597--netuid $val_netuid \
598--subtensor.network $val_subtensor_network \
599--wallet.name $val_wallet \
600--wallet.hotkey $val_hotkey \
601--axon.port $val_axon_port \
602--validator.score_backend.host $val_backend_host \
603--validator.score_backend.port $val_backend_port \
604--validator.universal_score_backend.host $val_universal_backend_host \
605--validator.universal_score_backend.port $val_universal_backend_port \
606--validator.use_wandb
607```
608
6096. Run the auto update script, it will check for updates every 30 minutes
610```bash
611pm2 start auto_update.sh --name "auto_updater"
612```
613
6147. Run Organic Server for using Organic API
615```bash
616pm2 start python --name condense_organic \
617-- -m services.validator_backend.organic.app:app \
618--netuid $val_netuid \
619--subtensor.network $val_subtensor_network \
620--wallet.name $val_wallet \
621--wallet.hotkey $val_hotkey \
622--axon.port $val_axon_port \
623--validator.gate_port $val_gate_port \
624```
625
626
627
628---
629File: /neural_condense_core/base/__init__.py
630---
631
632from .miner import Miner as BaseMiner
633from .validator import Validator as BaseValidator
634
635__all__ = ["BaseMiner", "BaseValidator"]
636
637
638
639---
640File: /neural_condense_core/base/config.py
641---
642
643import bittensor as bt
644from argparse import ArgumentParser
645from ..constants import constants
646
647
648def add_common_config(parser: ArgumentParser):
649    parser.add_argument("--netuid", type=int, default=1, help="The chain subnet uid.")
650    parser.add_argument(
651        "--whitelist_uids",
652        type=str,
653        default=None,
654        help="The uids to whitelist. For testing purposes.",
655    )
656    bt.subtensor.add_args(parser)
657    bt.logging.add_args(parser)
658    bt.wallet.add_args(parser)
659    bt.axon.add_args(parser)
660    return parser
661
662
663def add_validator_config(parser: ArgumentParser):
664    parser.add_argument(
665        "--validator.gate_port",
666        type=int,
667        default=None,
668        help="The port of the validator gate server.",
669    )
670
671    parser.add_argument(
672        "--validator.score_backend.host",
673        type=str,
674        default="localhost",
675        help="The host of the score backend server.",
676    )
677    parser.add_argument(
678        "--validator.score_backend.port",
679        type=int,
680        default=8089,
681        help="The port of the score backend server.",
682    )
683    parser.add_argument(
684        "--validator.universal_score_backend.host",
685        type=str,
686        default="localhost",
687        help="The host of the universal score backend server.",
688    )
689    parser.add_argument(
690        "--validator.universal_score_backend.port",
691        type=int,
692        default=8090,
693        help="The port of the universal score backend server.",
694    )
695
696    parser.add_argument(
697        "--validator.organic_client_url",
698        type=str,
699        default=constants.ORGANIC_CLIENT_URL,
700        help="The URL of the organic client.",
701    )
702
703    parser.add_argument(
704        "--validator.report_url",
705        type=str,
706        default=constants.REPORT_URL,
707        help="The URL of the report server.",
708    )
709
710    parser.add_argument(
711        "--validator.use_wandb",
712        action="store_true",
713        help="Whether to use wandb for logging.",
714    )
715
716    return parser
717
718
719def add_miner_config(parser: ArgumentParser):
720    tier_names = list(constants.TIER_CONFIG.keys())
721    parser.add_argument(
722        "--miner.backend_host",
723        type=str,
724        default="localhost",
725        help="The host of the backend server.",
726    )
727    parser.add_argument(
728        "--miner.backend_port",
729        type=int,
730        default=8088,
731        help="The port of the backend server.",
732    )
733    parser.add_argument(
734        "--miner.tier",
735        choices=tier_names,
736    )
737    return parser
738
739
740
741---
742File: /neural_condense_core/base/miner.py
743---
744
745import os
746import argparse
747import bittensor as bt
748from typing import Tuple
749from .config import add_common_config, add_miner_config
750from ..protocol import Metadata
751
752
753class Miner:
754    def __init__(self):
755        self.config = self.get_config()
756        self.blacklist_fns = [self._blacklist_fn]
757        self.forward_fns = [self._forward_metadata]
758        self.setup_bittensor_objects()
759        self.metadata = {
760            "tier": self.config.miner.tier,
761        }
762
763    def get_config(self):
764        parser = argparse.ArgumentParser()
765        parser = add_miner_config(parser)
766        parser = add_common_config(parser)
767        config = bt.config(parser)
768        config.full_path = os.path.expanduser(
769            "{}/{}/{}/netuid_{}/{}".format(
770                config.logging.logging_dir,
771                config.wallet.name,
772                config.wallet.hotkey_str,
773                config.netuid,
774                "miner",
775            )
776        )
777        print(config)
778        os.makedirs(config.full_path, exist_ok=True)
779        return config
780
781    def setup_logging(self):
782        bt.logging.enable_default()
783        bt.logging.enable_info()
784
785        if self.config.logging.debug:
786            bt.logging.enable_debug()
787        if self.config.logging.trace:
788            bt.logging.enable_trace()
789        # Activate Bittensor's logging with the set configurations.
790        bt.logging(config=self.config, logging_dir=self.config.full_path)
791        bt.logging.info(
792            f"Running miner for subnet: {self.config.netuid} on network: {self.config.subtensor.network} with config:"
793        )
794        bt.logging.info(self.config)
795
796    def setup_bittensor_objects(self):
797        bt.logging.info("Setting up Bittensor objects.")
798        self.wallet = bt.wallet(config=self.config)
799        bt.logging.info(f"Wallet: {self.wallet}")
800        self.subtensor = bt.subtensor(config=self.config)
801        bt.logging.info(f"Subtensor: {self.subtensor}")
802        self.metagraph = self.subtensor.metagraph(self.config.netuid)
803        bt.logging.info(f"Metagraph: {self.metagraph}")
804        if self.wallet.hotkey.ss58_address not in self.metagraph.hotkeys:
805            bt.logging.error(
806                f"\nYour miner: {self.wallet} is not registered to chain connection: {self.subtensor} \nRun 'btcli register' and try again."
807            )
808            exit()
809        else:
810            self.my_subnet_uid = self.metagraph.hotkeys.index(
811                self.wallet.hotkey.ss58_address
812            )
813            bt.logging.info(f"Running miner on uid: {self.my_subnet_uid}")
814
815    def setup_axon(self):
816        self.axon = bt.axon(
817            wallet=self.wallet,
818            port=self.config.axon.port,
819            external_port=self.config.axon.external_port,
820        )
821        bt.logging.info("Attaching forward function to axon.")
822        for blacklist_fn, forward_fn in zip(self.blacklist_fns, self.forward_fns):
823            bt.logging.info(
824                f"Attaching blacklist_fn: {blacklist_fn} and forward_fn: {forward_fn}"
825            )
826            self.axon.attach(
827                forward_fn=forward_fn,
828                blacklist_fn=blacklist_fn,
829            )
830        bt.logging.info(
831            f"Serving axon on network: {self.config.subtensor.network} with netuid: {self.config.netuid}"
832        )
833        self.axon.serve(netuid=self.config.netuid, subtensor=self.subtensor)
834        bt.logging.info(f"Axon: {self.axon}")
835        bt.logging.info(f"Starting axon server on port: {self.config.axon.port}")
836        self.axon.start()
837
838    def _forward_metadata(self, synapse: Metadata) -> Metadata:
839        synapse.metadata = self.metadata
840        return synapse
841
842    def _blacklist_fn(self, synapse: Metadata) -> Tuple[bool, str]:
843        return False, "Always pass."
844
845
846
847---
848File: /neural_condense_core/base/validator.py
849---
850
851import os
852import asyncio
853import argparse
854import traceback
855import bittensor as bt
856import time
857import threading
858from .config import add_common_config, add_validator_config
859from abc import abstractmethod, ABC
860from ..constants import constants
861from ..logger import logger
862
863
864class Validator(ABC):
865    def __init__(self):
866        self.config = self.get_config()
867        print(self.config)
868        self.setup_logging()
869        self.setup_bittensor_objects()
870        self.last_update = 0
871        self.current_block = 0
872        self.uid = self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address)
873        self.is_running = False
874        self.should_exit = False
875        self.setup_axon()
876        self.loop = asyncio.get_event_loop()
877
878    def get_config(self):
879        parser = argparse.ArgumentParser()
880        parser = add_validator_config(parser)
881        parser = add_common_config(parser)
882        config = bt.config(parser)
883        config.full_path = os.path.expanduser(
884            "{}/{}/{}/netuid{}/{}".format(
885                config.logging.logging_dir,
886                config.wallet.name,
887                config.wallet.hotkey_str,
888                config.netuid,
889                "validator",
890            )
891        )
892        os.makedirs(config.full_path, exist_ok=True)
893        return config
894
895    def setup_logging(self):
896        bt.logging.enable_default()
897        bt.logging.enable_info()
898
899        if self.config.logging.debug:
900            bt.logging.enable_debug()
901        if self.config.logging.trace:
902            bt.logging.enable_trace()
903        bt.logging(config=self.config, logging_dir=self.config.full_path)
904        logger.info(
905            f"Running validator for subnet: {self.config.netuid} on network: {self.config.subtensor.network} with config:"
906        )
907        logger.info(self.config)
908        pass
909
910    def setup_bittensor_objects(self):
911        logger.info("Setting up Bittensor objects.")
912        self.wallet = bt.wallet(config=self.config)
913        logger.info(f"Wallet: {self.wallet}")
914        self.subtensor = bt.subtensor(config=self.config)
915        logger.info(f"Subtensor: {self.subtensor}")
916        self.dendrite = bt.dendrite(wallet=self.wallet)
917        logger.info(f"Dendrite: {self.dendrite}")
918        self.metagraph = self.subtensor.metagraph(self.config.netuid)
919        logger.info(f"Metagraph: {self.metagraph}")
920        if self.wallet.hotkey.ss58_address not in self.metagraph.hotkeys:
921            logger.error(
922                f"\nYour validator: {self.wallet} is not registered to chain connection: {self.subtensor} \nRun 'btcli register' and try again."
923            )
924            exit()
925        else:
926            self.my_subnet_uid = self.metagraph.hotkeys.index(
927                self.wallet.hotkey.ss58_address
928            )
929            logger.info(f"Running validator on uid: {self.my_subnet_uid}")
930
931    def setup_axon(self):
932        self.axon = bt.axon(wallet=self.wallet, config=self.config)
933        logger.info(
934            f"Serving axon on network: {self.config.subtensor.network} with netuid: {self.config.netuid}"
935        )
936        self.axon.serve(netuid=self.config.netuid, subtensor=self.subtensor)
937        logger.info(f"Axon: {self.axon}")
938        logger.info(f"Starting axon server on port: {self.config.axon.port}")
939        self.axon.start()
940
941    @abstractmethod
942    async def start_epoch(self):
943        pass
944
945    def run(self):
946        logger.info("Starting validator loop.")
947        while not self.should_exit:
948            start_epoch = time.time()
949
950            try:
951                self.loop.run_until_complete(self.start_epoch())
952            except Exception as e:
953                logger.error(f"Forward error: {e}")
954                traceback.print_exc()
955
956            end_epoch = time.time()
957            elapsed = end_epoch - start_epoch
958            time_to_sleep = max(0, constants.EPOCH_LENGTH - elapsed)
959
960            logger.info(f"Epoch finished. Sleeping for {time_to_sleep} seconds.")
961            time.sleep(time_to_sleep)
962
963            try:
964                self.resync_metagraph()
965            except Exception as e:
966                logger.error(f"Resync metagraph error: {e}")
967                traceback.print_exc()
968
969                # If someone intentionally stops the validator, it'll safely terminate operations.
970            except KeyboardInterrupt:
971                self.axon.stop()
972                logger.success("Validator killed by keyboard interrupt.")
973                exit()
974
975    @abstractmethod
976    def set_weights(self):
977        pass
978
979    def resync_metagraph(self):
980        self.metagraph.sync()
981
982    def run_in_background_thread(self):
983        """
984        Starts the validator's operations in a background thread upon entering the context.
985        This method facilitates the use of the validator in a 'with' statement.
986        """
987        if not self.is_running:
988            logger.debug("Starting validator in background thread.")
989            self.should_exit = False
990            self.thread = threading.Thread(target=self.run, daemon=True)
991            self.thread.start()
992            self.is_running = True
993            logger.debug("Started")
994
995    def __enter__(self):
996        self.run_in_background_thread()
997        return self
998
999    def __exit__(self, exc_type, exc_value, traceback):
1000        """
1001        Stops the validator's background operations upon exiting the context.
1002        This method facilitates the use of the validator in a 'with' statement.
1003
1004        Args:
1005            exc_type: The type of the exception that caused the context to be exited.
1006                      None if the context was exited without an exception.
1007            exc_value: The instance of the exception that caused the context to be exited.
1008                       None if the context was exited without an exception.
1009            traceback: A traceback object encoding the stack trace.
1010                       None if the context was exited without an exception.
1011        """
1012        if self.is_running:
1013            logger.debug("Stopping validator in background thread.")
1014            self.should_exit = True
1015            self.thread.join(5)
1016            self.is_running = False
1017            logger.debug("Stopped")
1018
1019
1020
1021---
1022File: /neural_condense_core/common/__init__.py
1023---
1024
1025from .rate_limit import build_rate_limit
1026from . import base64
1027from .file import clean_tmp_directory
1028
1029__all__ = ["build_rate_limit", "base64", "clean_tmp_directory"]
1030
1031
1032
1033---
1034File: /neural_condense_core/common/base64.py
1035---
1036
1037import numpy as np
1038import base64
1039import io
1040
1041
1042def ndarray_to_base64(array: np.ndarray) -> str:
1043    """Convert a NumPy array to a base64-encoded string."""
1044    try:
1045        buffer = io.BytesIO()
1046        np.save(buffer, array)
1047        buffer.seek(0)
1048        base64_str = base64.b64encode(buffer.read()).decode("utf-8")
1049    except Exception:
1050        base64_str = ""
1051    return base64_str
1052
1053
1054def base64_to_ndarray(base64_str: str) -> np.ndarray:
1055    """Convert a base64-encoded string back to a NumPy array."""
1056    try:
1057        buffer = io.BytesIO(base64.b64decode(base64_str))
1058        buffer.seek(0)
1059        array = np.load(buffer)
1060        array = array.astype(np.float32)
1061    except Exception as e:
1062        print(f"Base64 to ndarray error: {e}")
1063        array = np.array([])
1064    return array
1065
1066
1067
1068---
1069File: /neural_condense_core/common/file.py
1070---
1071
1072import hf_transfer
1073import numpy as np
1074import io
1075import time
1076import httpx
1077import os
1078from rich.progress import track
1079from ..logger import logger
1080import asyncio
1081import sys
1082import uuid
1083from concurrent.futures import ThreadPoolExecutor
1084from firerequests import FireRequests
1085
1086fire_downloader = FireRequests()
1087
1088
1089def clean_tmp_directory():
1090    """Clean the tmp directory if running as validator."""
1091    if (
1092        __name__ != "__main__"
1093        and os.path.basename(os.path.abspath(sys.argv[0])) == "validator.py"
1094    ):
1095        os.makedirs("tmp", exist_ok=True)
1096        for file in track(os.listdir("tmp"), description="Cleaning tmp directory"):
1097            os.remove(os.path.join("tmp", file))
1098
1099
1100def _check_file_size(response: httpx.Response, max_size_mb: int) -> tuple[bool, str]:
1101    """Check if file size is within limits."""
1102    content_length = int(response.headers.get("content-length", 0))
1103    max_size_bytes = max_size_mb * 1024 * 1024
1104
1105    if content_length > max_size_bytes:
1106        return (
1107            False,
1108            f"File too large: {content_length / (1024 * 1024):.1f}MB exceeds {max_size_mb}MB limit",
1109        )
1110    return True, ""
1111
1112
1113def _generate_filename(url: str) -> str:
1114    """Generate a unique filename for downloaded file."""
1115    return os.path.join("tmp", str(uuid.uuid4()) + "_" + url.split("/")[-1])
1116
1117
1118async def _download(url: str) -> tuple[str, float, str]:
1119    """Download file using hf_transfer."""
1120    debug_start_time = time.time()
1121    try:
1122        filename = _generate_filename(url)
1123        start_time = time.time()
1124
1125        await fire_downloader.download_file(
1126            url=url,
1127            filename=filename,
1128            max_files=10,  # Number of parallel downloads
1129            chunk_size=1024 * 1024,  # 1 MB chunks
1130            parallel_failures=3,
1131            max_retries=5,
1132            headers=None,
1133            show_progress=False,
1134        )
1135
1136        download_time = time.time() - start_time
1137        logger.info(f"Time taken to download: {download_time:.2f} seconds")
1138        return filename, download_time, ""
1139    except Exception as e:
1140        return "", time.time() - debug_start_time, "Download failed: " + str(e)
1141
1142
1143def _load_and_cleanup(filename: str) -> tuple[np.ndarray | None, str]:
1144    """Load NPY file and convert to float32."""
1145    try:
1146        with open(filename, "rb") as f:
1147            buffer = io.BytesIO(f.read())
1148            data = np.load(buffer)
1149        return data.astype(np.float32), ""
1150    except Exception as e:
1151        logger.error(f"Error loading NPY file: {e}")
1152        return None, str(e)
1153
1154
1155async def load_npy_from_url(
1156    url: str, max_size_mb: int = 1024
1157) -> tuple[np.ndarray | None, str, float, str]:
1158    """
1159    Load a `.npy` file from a URL using the hf_transfer library for efficient downloading.
1160
1161    Args:
1162        url (str): URL of the `.npy` file.
1163        max_size_mb (int): Maximum allowed file size in megabytes.
1164
1165    Returns:
1166        tuple: (data, filename, download_time, error_message)
1167            - data: Loaded NumPy array or None if error
1168            - filename: Local filename where data was saved
1169            - download_time: Time taken to download in seconds
1170            - error_message: Empty string if successful, error description if failed
1171    """
1172    try:
1173        # Check file size using HTTP HEAD request
1174        async with httpx.AsyncClient() as client:
1175            response = await client.head(url)
1176            if response.status_code != 200:
1177                return (
1178                    None,
1179                    "",
1180                    0,
1181                    f"Failed to fetch file info: HTTP {response.status_code}",
1182                )
1183
1184            size_ok, error = _check_file_size(response, max_size_mb)
1185            if not size_ok:
1186                return None, "", 0, error
1187
1188        # Download and process file in thread pool asyncio
1189        filename, download_time, error = await _download(url)
1190        if error:
1191            return None, "", 0, error
1192
1193        data, error = _load_and_cleanup(filename)
1194        if error:
1195            return None, "", 0, error
1196
1197        return data, filename, download_time, ""
1198
1199    except Exception as e:
1200        return None, "", 0, str(e)
1201
1202
1203# Clean tmp directory on module load if running as validator
1204clean_tmp_directory()
1205
1206
1207
1208---
1209File: /neural_condense_core/common/rate_limit.py
1210---
1211
1212import pandas as pd
1213from ..logger import logger
1214from ..constants import constants
1215
1216
1217def build_rate_limit(metagraph, config=None, tier=None):
1218    S = metagraph.S
1219    if config and config.whitelist_uids:
1220        whitelist_uids = [int(uid) for uid in config.whitelist_uids.split(",")]
1221    else:
1222        whitelist_uids = [i for i in range(len(S)) if S[i] > constants.MIN_STAKE]
1223
1224    selected_tier_config = constants.TIER_CONFIG[tier or config.miner.tier]
1225    rpe = selected_tier_config.requests_per_epoch
1226
1227    # Calculate total stake of whitelisted UIDs
1228    total_stake = sum(S[uid] for uid in whitelist_uids)
1229
1230    # Compute rate limits based on normalized stakes
1231    rate_limits = {}
1232    for uid in whitelist_uids:
1233        normalized_stake = S[uid] / total_stake if total_stake > 0 else 0
1234        rate_limits[uid] = max(int(rpe * normalized_stake), 10)
1235
1236    # Set rate limit to 0 for non-whitelisted UIDs
1237    for uid in range(len(S)):
1238        if uid not in whitelist_uids:
1239            rate_limits[uid] = 0
1240
1241    _df = pd.DataFrame(
1242        {
1243            "uids": whitelist_uids,
1244            "rate_limits": [rate_limits[uid] for uid in whitelist_uids],
1245        }
1246    )
1247    logger.info(f"Rate limits for tier {tier}:\n{_df.to_markdown()}")
1248    return rate_limits
1249
1250
1251
1252---
1253File: /neural_condense_core/miner_utils/__init__.py
1254---
1255
1256from .rate_limit_counter import RateLimitCounter
1257
1258__all__ = ["RateLimitCounter"]
1259
1260
1261
1262---
1263File: /neural_condense_core/miner_utils/rate_limit_counter.py
1264---
1265
1266import time
1267
1268
1269class RateLimitCounter:
1270    def __init__(self, rate_limit: int, period: int):
1271        self.rate_limit = rate_limit
1272        self.period = period
1273        self.count = 0
1274        self.last_reset = time.time()
1275
1276    def is_first_request(self):
1277        return self.count == 0
1278
1279    def increment(self):
1280        now = time.time()
1281        if now - self.last_reset > self.period:
1282            self.count = 0
1283            self.last_reset = now
1284        self.count += 1
1285        return self.count <= self.rate_limit
1286
1287    def reset(self):
1288        self.count = 0
1289        self.last_reset = time.time()
1290
1291    def get_count(self):
1292        return self.count
1293
1294    def get_rate_limit(self):
1295        return self.rate_limit
1296
1297    def get_period(self):
1298        return self.period
1299
1300
1301
1302---
1303File: /neural_condense_core/validator_utils/loop/__init__.py
1304---
1305
1306from .forward import (
1307    get_task_config,
1308    initialize_wandb,
1309    query_miners,
1310    prepare_synapse,
1311    validate_responses,
1312    process_and_score_responses,
1313)
1314from . import logging
1315
1316__all__ = [
1317    "get_task_config",
1318    "initialize_wandb",
1319    "query_miners",
1320    "prepare_synapse",
1321    "validate_responses",
1322    "process_and_score_responses",
1323    "logging",
1324]
1325
1326
1327
1328---
1329File: /neural_condense_core/validator_utils/loop/forward.py
1330---
1331
1332import neural_condense_core as ncc
1333import bittensor as bt
1334import random
1335import httpx
1336import wandb
1337from ...protocol import TextCompressProtocol
1338from ...logger import logger
1339from ..synthesizing.challenge_generator import ChallengeGenerator
1340from ..managing.miner_manager import MinerManager, ServingCounter, MinerMetadata
1341from ...constants import SyntheticTaskConfig, TierConfig
1342import asyncio
1343import os
1344import traceback
1345
1346
1347def get_task_config() -> SyntheticTaskConfig:
1348    """
1349    Get a random task configuration based on weights.
1350
1351    Returns:
1352        SyntheticTaskConfig: The selected task configuration
1353    """
1354    return random.choices(
1355        ncc.constants.SYNTHETIC_TASK_CONFIG,
1356        weights=[t.weight for t in ncc.constants.SYNTHETIC_TASK_CONFIG],
1357    )[0]
1358
1359
1360async def prepare_synapse(
1361    challenge_generator: ChallengeGenerator,
1362    tier: str,
1363    task_config: SyntheticTaskConfig,
1364    tier_config: TierConfig,
1365    model_name: str,
1366) -> TextCompressProtocol:
1367    """
1368    Prepare a synapse for validation.
1369
1370    Args:
1371        tokenizer: The tokenizer to use
1372        task_config (SyntheticTaskConfig): Configuration for the task
1373        tier_config (TierConfig): Configuration for the tier
1374        model_name (str): Name of the model to use
1375
1376    Returns:
1377        The prepared synapse object
1378    """
1379    try:
1380        synapse = await challenge_generator.generate_challenge(
1381            model_name=model_name,
1382            tier=tier,
1383            task=task_config.task,
1384            max_context_length_in_chars=tier_config.max_context_length_in_chars,
1385        )
1386        synapse.target_model = model_name
1387        synapse.tier = tier
1388    except Exception as e:
1389        logger.error(f"Error generating challenge: {e}")
1390        traceback.print_exc()
1391        return None
1392    return synapse
1393
1394
1395async def query_miners(
1396    dendrite: bt.dendrite,
1397    metagraph: bt.metagraph,
1398    uids: list[int],
1399    synapse,
1400    timeout: int,
1401) -> list[TextCompressProtocol]:
1402    """
1403    Query a group of miners with a synapse.
1404
1405    Args:
1406        dendrite: The dendrite connection
1407        uids (list[int]): List of miner UIDs to query
1408        synapse: The synapse to send
1409        timeout (int): Query timeout in seconds
1410
1411    Returns:
1412        list: Responses from the miners
1413    """
1414    batched_uids = [
1415        uids[i : i + ncc.constants.BATCH_SIZE]
1416        for i in range(0, len(uids), ncc.constants.BATCH_SIZE)
1417    ]
1418    all_responses = []
1419    for batch_uids in batched_uids:
1420        responses = await dendrite.forward(
1421            axons=[metagraph.axons[uid] for uid in batch_uids],
1422            synapse=synapse,
1423            deserialize=False,
1424            timeout=timeout,
1425        )
1426        all_responses.extend(responses)
1427    return all_responses
1428
1429
1430async def validate_responses(
1431    responses: list[TextCompressProtocol],
1432    uids: list[int],
1433    tier_config: TierConfig,
1434    tier: str,
1435    tokenizer=None,
1436    ground_truth_synapse: TextCompressProtocol = None,
1437) -> tuple[list[TextCompressProtocol], list[int], list[int], list[str]]:
1438    valid_responses, valid_uids, invalid_uids, invalid_reasons = [], [], [], []
1439
1440    # Add recursion limit protection
1441    async def verify_single_response(response):
1442        try:
1443            # Add timeout to prevent hanging
1444            is_valid, reason = await asyncio.wait_for(
1445                TextCompressProtocol.verify(
1446                    response,
1447                    tier_config,
1448                    tier,
1449                    tokenizer,
1450                    ground_truth_synapse,
1451                ),
1452                timeout=360,
1453            )
1454            return is_valid, reason
1455        except asyncio.TimeoutError:
1456            return False, "Verification timeout"
1457        except RecursionError:
1458            return False, "Recursion limit exceeded"
1459        except Exception as e:
1460            return False, f"Failed to verify: {str(e)}"
1461
1462    results = []
1463    for response in responses:
1464        verify_result = await verify_single_response(response)
1465        results.append(verify_result)
1466
1467    # Process results maintaining order
1468    for uid, (is_valid, reason), response in zip(uids, results, responses):
1469        if is_valid:
1470            valid_responses.append(response)
1471            valid_uids.append(uid)
1472        else:
1473            invalid_uids.append(uid)
1474            invalid_reasons.append(reason)
1475
1476    return valid_responses, valid_uids, invalid_uids, invalid_reasons
1477
1478
1479async def process_and_score_responses(
1480    miner_manager: MinerManager,
1481    valid_responses: list[TextCompressProtocol],
1482    valid_uids: list[int],
1483    invalid_uids: list[int],
1484    ground_truth_synapse: TextCompressProtocol,
1485    model_name: str,
1486    task_config: SyntheticTaskConfig,
1487    tier_config: TierConfig,
1488    config: bt.config = None,
1489    invalid_reasons: list[str] = [],
1490    timeout: int = 120,
1491    tier: str = "",
1492) -> dict[str, list]:
1493    if len(valid_responses) > 0:
1494        accuracies, accelerate_rewards = await get_accuracies(
1495            valid_responses=valid_responses,
1496            ground_truth_synapse=ground_truth_synapse,
1497            model_name=model_name,
1498            task_config=task_config,
1499            timeout=timeout,
1500            config=config,
1501            tier=tier,
1502        )
1503        scores = [
1504            (
1505                accu * (1 - tier_config.accelerate_reward_scalar)
1506                + accel * tier_config.accelerate_reward_scalar
1507            )
1508            * (accu > 0)
1509            for accu, accel in zip(accuracies, accelerate_rewards)
1510        ] + [0] * len(invalid_uids)
1511    else:
1512        scores = [0] * len(valid_uids) + [0] * len(invalid_uids)
1513        accuracies = []
1514        accelerate_rewards = []
1515    total_uids = valid_uids + invalid_uids
1516    updated_scores, previous_scores = miner_manager.update_scores(
1517        scores=scores,
1518        total_uids=total_uids,
1519    )
1520    score_changes = [
1521        f"{round(previous_scores[i], 3)} -> {round(updated_scores[i], 3)}"
1522        for i in range(len(previous_scores))
1523    ]
1524    logs = {
1525        "uid": total_uids,
1526        "accuracy": accuracies + [0] * len(invalid_uids),
1527        "accelerate_reward": accelerate_rewards + [0] * len(invalid_uids),
1528        "score_change": score_changes,
1529        "invalid_reasons": [""] * len(valid_uids) + invalid_reasons,
1530    }
1531    return logs, total_uids
1532
1533
1534def update_metrics_of_invalid_miners(
1535    invalid_uids: list[int],
1536    metrics: dict,
1537):
1538    for metric_name, values in metrics.items():
1539        values.extend([0] * len(invalid_uids))
1540    return metrics
1541
1542
1543async def get_accuracies(
1544    valid_responses: list,
1545    ground_truth_synapse: TextCompressProtocol,
1546    model_name: str,
1547    task_config: SyntheticTaskConfig,
1548    timeout: int = 240,
1549    config: bt.config = None,
1550    tier: str = "",
1551) -> tuple[list, list]:
1552    payload = TextCompressProtocol.get_scoring_payload(
1553        responses=valid_responses,
1554        ground_truth_synapse=ground_truth_synapse,
1555        target_model=model_name,
1556        criterias=task_config.criterias,
1557    ).model_dump()
1558    if tier == "universal":
1559        url = f"http://{config.validator.universal_score_backend.host}:{config.validator.universal_score_backend.port}/get_metrics"
1560    else:
1561        url = f"http://{config.validator.score_backend.host}:{config.validator.score_backend.port}/get_metrics"
1562    logger.info(f"Sending payload to scoring backend: {url}")
1563    async with httpx.AsyncClient() as client:
1564        try:
1565            response = await client.post(
1566                url,
1567                json=payload,
1568                timeout=timeout,
1569            )
1570        except Exception as e:
1571            logger.error(f"Error sending payload to scoring backend: {e}")
1572        for r in valid_responses:
1573            try:
1574                if r.util_data.local_filename:
1575                    os.remove(r.util_data.local_filename)
1576            except Exception as e:
1577                logger.error(
1578                    f"Error removing local file {r.util_data.local_filename}: {e}"
1579                )
1580        logger.info("Removed all local files")
1581        if response.status_code != 200:
1582            raise Exception(
1583                f"Scoring backend returned status code {response.status_code}"
1584            )
1585        scoring_response = response.json()
1586
1587    accuracies = scoring_response["metrics"]["accuracy"]
1588    accelerate_rewards = [r.accelerate_score for r in valid_responses]
1589    return accuracies, accelerate_rewards
1590
1591
1592def initialize_wandb(dendrite: bt.dendrite, metagraph: bt.metagraph, uid: int):
1593    try:
1594        message = "incentivized-decentralzied-condensed-ai" + "-".join(
1595            random.choices("0123456789abcdef", k=16)
1596        )
1597        signature = f"0x{dendrite.keypair.sign(message).hex()}"
1598        wandb.init(
1599            project="Neural-Condense-Subnet",
1600            name=f"validator-{uid}",
1601            entity="toilaluan",
1602            job_type="validation",
1603            group="validator",
1604            resume="allow",
1605            config={
1606                "signature": signature,
1607                "uid": uid,
1608                "message": message,
1609                "ss58_address": metagraph.hotkeys[uid],
1610            },
1611        )
1612    except Exception as e:
1613        logger.error(f"Starting wandb error: {e}")
1614
1615
1616
1617---
1618File: /neural_condense_core/validator_utils/loop/logging.py
1619---
1620
1621import wandb
1622import pandas as pd
1623from ...logger import logger
1624
1625
1626def log_wandb(logs: dict, uids: list[int], tier=""):
1627    try:
1628        for metric, values in logs.items():
1629            if metric == "perplexity":
1630                for uid, value in zip(uids, values):
1631                    if value is None or not isinstance(value, float):
1632                        continue
1633                    wandb.log({f"{tier}-{uid}/perplexity": abs(value)})
1634    except Exception as e:
1635        logger.error(f"Error logging to wandb: {e}")
1636
1637
1638def log_as_dataframe(data: dict):
1639    for metric, values in data.items():
1640        for i in range(len(values)):
1641            if values[i] is None:
1642                values[i] = "N/A"
1643            if isinstance(values[i], float):
1644                values[i] = round(values[i], 2)
1645    df = pd.DataFrame(data)
1646    return df
1647
1648
1649
1650---
1651File: /neural_condense_core/validator_utils/managing/__init__.py
1652---
1653
1654from .miner_manager import MinerManager, ServingCounter
1655
1656__all__ = [
1657    "MinerManager",
1658    "ServingCounter",
1659]
1660
1661
1662
1663---
1664File: /neural_condense_core/validator_utils/managing/metric_converter.py
1665---
1666
1667from ...constants import TierConfig
1668
1669
1670class MetricConverter:
1671    def __init__(self):
1672        self.converters = {
1673            "perplexity": self.perplexity_to_score,
1674            "accuracy": self.accuracy_to_score,
1675            "bleu": self.bleu_to_score,
1676        }
1677
1678    def convert_metrics_to_score(
1679        self, metrics: dict, tier_config: TierConfig
1680    ) -> dict[str, list[float]]:
1681        total_scores = {}
1682        accelerate_bonuses = self.get_accelerate_bonuses(metrics, tier_config)
1683        for metric, values in metrics.items():
1684            try:
1685                converter = self.converters[metric]
1686                scores = converter(values)
1687                scores = [
1688                    s * (1 + a) if a is not None else s
1689                    for s, a in zip(scores, accelerate_bonuses)
1690                ]
1691                total_scores[metric] = scores
1692            except KeyError:
1693                continue
1694        return total_scores
1695
1696    def perplexity_to_score(self, perplexities: list[float]):
1697        valid_perplexities = [p for p in perplexities if p is not None]
1698        if not valid_perplexities:
1699            return perplexities
1700        pivot = min(valid_perplexities)
1701        scores = [pivot / p if p is not None else None for p in perplexities]
1702        return scores
1703
1704    def accuracy_to_score(self, accuracies: list[float]):
1705        return accuracies
1706
1707    def get_accelerate_bonuses(self, metrics: dict, tier_config: TierConfig):
1708        accelerate_metrics = metrics["accelerate_metrics"]
1709        return [
1710            s * tier_config.accelerate_reward_scalar if s is not None else None
1711            for s in accelerate_metrics
1712        ]
1713
1714    def bleu_to_score(self, bleus: list[float]):
1715        return bleus
1716
1717
1718
1719---
1720File: /neural_condense_core/validator_utils/managing/miner_manager.py
1721---
1722
1723import bittensor as bt
1724import numpy as np
1725import httpx
1726import pandas as pd
1727import time
1728import asyncio
1729from .metric_converter import MetricConverter
1730from ...common import build_rate_limit
1731from ...protocol import Metadata
1732from ...constants import constants
1733from .utils import (
1734    apply_top_percentage_threshold,
1735    standardize_scores,
1736    normalize_and_weight_scores,
1737)
1738from ...logger import logger
1739import redis
1740from sqlalchemy import create_engine, Column, Integer, String, Float
1741from sqlalchemy.ext.declarative import declarative_base
1742from sqlalchemy.orm import sessionmaker
1743
1744Base = declarative_base()
1745
1746
1747class MinerMetadata(Base):
1748    """
1749    SQLAlchemy model for storing miner metadata.
1750
1751    Attributes:
1752        uid (int): Unique identifier for the miner
1753        tier (str): Miner's tier level (default: "unknown")
1754        score (float): Miner's performance score (default: 0.0)
1755    """
1756
1757    __tablename__ = "miner_metadata"
1758
1759    uid = Column(Integer, primary_key=True)
1760    tier = Column(String, default="unknown")
1761    score = Column(Float, default=0.0)
1762
1763    def __init__(self, uid, tier="unknown", score=0.0):
1764        self.uid = uid
1765        self.tier = tier
1766        self.score = score
1767
1768    def to_dict(self):
1769        """Convert metadata to dictionary format."""
1770        return {"uid": self.uid, "tier": self.tier, "score": self.score}
1771
1772
1773class ServingCounter:
1774    """
1775    Redis-based rate limiter for miner requests.
1776
1777    Uses atomic Redis operations to track and limit request rates per miner.
1778
1779    Attributes:
1780        rate_limit (int): Max requests allowed per epoch
1781        redis_client (redis.Redis): Redis connection for distributed counting
1782        key (str): Unique Redis key for this counter
1783        expire_time (int): TTL for counter keys in Redis
1784    """
1785
1786    def __init__(
1787        self,
1788        rate_limit: int,
1789        uid: int,
1790        tier: str,
1791        redis_client: redis.Redis,
1792        postfix_key: str = "",
1793    ):
1794        self.rate_limit = rate_limit
1795        self.redis_client = redis_client
1796        self.key = constants.DATABASE_CONFIG.redis.serving_counter_key_format.format(
1797            tier=tier,
1798            uid=uid,
1799        ) + str(postfix_key)
1800
1801    def increment(self) -> bool:
1802        """
1803        Increment request counter and check rate limit.
1804
1805        Uses atomic Redis INCR operation and sets expiry on first increment.
1806
1807        Reset the counter after EPOCH_LENGTH seconds.
1808
1809        Returns:
1810            bool: True if under rate limit, False if exceeded
1811        """
1812        count = self.redis_client.incr(self.key)
1813
1814        if count == 1:
1815            self.redis_client.expire(self.key, constants.EPOCH_LENGTH)
1816
1817        if count <= self.rate_limit:
1818            return True
1819
1820        logger.info(f"Rate limit exceeded for {self.key}")
1821        return False
1822
1823    def get_current_count(self):
1824        return self.redis_client.get(self.key)
1825
1826    def reset_counter(self):
1827        self.redis_client.set(self.key, 0)
1828
1829
1830class MinerManager:
1831    """
1832    Manages miner metadata, scoring and rate limiting.
1833
1834    Handles:
1835    - Miner metadata storage and updates
1836    - Performance scoring and normalization
1837    - Request rate limiting per miner
1838    - Synchronization with validator network
1839
1840    Attributes:
1841        wallet: Bittensor wallet
1842        dendrite: Network communication client
1843        metagraph: Network state/topology
1844        config: Validator configuration
1845        redis_client: Redis connection
1846        session: SQLAlchemy database session
1847        metric_converter: Converts raw metrics to scores
1848        rate_limit_per_tier: Request limits by tier
1849    """
1850
1851    def __init__(self, uid, wallet, metagraph, config=None):
1852        self.is_main_process = bool(config)
1853        self.config = config
1854        self.uid = uid
1855        self.wallet = wallet
1856        self.dendrite = bt.dendrite(wallet=self.wallet)
1857        self.metagraph = metagraph
1858
1859        # Initialize Redis
1860        redis_config = constants.DATABASE_CONFIG.redis
1861        self.redis_client = redis.Redis(
1862            host=redis_config.host, port=redis_config.port, db=redis_config.db
1863        )
1864
1865        # Initialize SQLAlchemy
1866        self.engine = create_engine(constants.DATABASE_CONFIG.sql.url)
1867        Base.metadata.create_all(self.engine)
1868        Session = sessionmaker(bind=self.engine)
1869        self.session = Session()
1870
1871        self._init_metadata()
1872        self.metric_converter = MetricConverter()
1873        self.rate_limit_per_tier = self.get_rate_limit_per_tier()
1874        logger.info(f"Rate limit per tier: {self.rate_limit_per_tier}")
1875
1876        self.loop = asyncio.get_event_loop()
1877        self.loop.run_until_complete(self.sync())
1878
1879    def get_metadata(self, uids: list[int] = []) -> dict[int, MinerMetadata]:
1880        """
1881        Get metadata for specified miner UIDs.
1882
1883        Args:
1884            uids: List of miner UIDs to fetch. Empty list returns all miners.
1885
1886        Returns:
1887            dict: Mapping of UID to miner metadata
1888        """
1889        query = self.session.query(MinerMetadata)
1890        if uids:
1891            query = query.filter(MinerMetadata.uid.in_(uids))
1892        return {miner.uid: miner for miner in query.all()}
1893
1894    def update_scores(self, scores: list[float], total_uids: list[int]):
1895        """
1896        Update miner scores with exponential moving average.
1897
1898        Args:
1899            scores: New performance scores
1900            total_uids: UIDs corresponding to scores
1901
1902        Returns:
1903            tuple: Updated scores and previous scores
1904        """
1905        updated_scores = []
1906        previous_scores = []
1907
1908        for uid, score in zip(total_uids, scores):
1909            miner = self.session.query(MinerMetadata).get(uid)
1910            previous_scores.append(miner.score)
1911
1912            # EMA with 0.9 decay factor
1913            miner.score = miner.score * 0.9 + score * 0.1
1914            miner.score = max(0, miner.score)
1915            updated_scores.append(miner.score)
1916
1917        self.session.commit()
1918        return updated_scores, previous_scores
1919
1920    def get_normalized_ratings(self, top_percentage: float = 1.0) -> np.ndarray:
1921        """
1922        Calculate normalized ratings across all miners.
1923
1924        Applies:
1925        1. Top percentage thresholding
1926        2. Score standardization
1927        3. Sigmoid compression
1928        4. Tier-based incentive weighting
1929
1930        Args:
1931            top_percentage: Fraction of top miners to consider
1932
1933        Returns:
1934            np.ndarray: Normalized ratings for all miners
1935        """
1936        weights = np.zeros(len(self.metagraph.hotkeys))
1937
1938        for tier in constants.TIER_CONFIG:
1939            tier_weights = self._get_tier_weights(tier, top_percentage)
1940            for uid, weight in tier_weights.items():
1941                weights[uid] = weight
1942
1943        return weights
1944
1945    def _get_tier_weights(self, tier: str, top_percentage: float) -> dict[int, float]:
1946        """
1947        Calculate weights for miners in a specific tier.
1948
1949        Args:
1950            tier: The tier to calculate weights for
1951            top_percentage: Fraction of top miners to consider
1952
1953        Returns:
1954            dict: Mapping of UID to weight for miners in tier
1955        """
1956        # Get scores for miners in this tier
1957        miners = self.session.query(MinerMetadata).filter_by(tier=tier).all()
1958        tier_scores = [m.score for m in miners]
1959        tier_uids = [m.uid for m in miners]
1960
1961        if not tier_scores:
1962            return {}
1963
1964        scores = apply_top_percentage_threshold(tier_scores, tier_uids, top_percentage)
1965        scores = normalize_and_weight_scores(scores, tier)
1966
1967        return dict(zip(tier_uids, scores))
1968
1969    def _init_metadata(self):
1970        """Initialize metadata entries for all miners."""
1971        for uid in self.metagraph.uids:
1972            try:
1973                self.session.query(MinerMetadata).get(uid)
1974            except Exception as e:
1975                logger.info(f"Reinitialize uid {uid}, {e}")
1976                self.session.add(MinerMetadata(uid=uid))
1977        self.session.commit()
1978
1979    async def sync(self):
1980        """Synchronize metadata and rate limiters."""
1981        logger.info("Synchronizing metadata and serving counters.")
1982        self.rate_limit_per_tier = self.get_rate_limit_per_tier()
1983        logger.info(f"Rate limit per tier: {self.rate_limit_per_tier}")
1984
1985        self.serving_counter = self._create_serving_counter()
1986
1987        if self.is_main_process:
1988            await self._update_metadata()
1989            self._log_metadata()
1990
1991    def _log_metadata(self):
1992        """Log current metadata as formatted DataFrame."""
1993        metadata = {
1994            m.uid: {"tier": m.tier, "elo_rating": m.score * 100}
1995            for m in self.session.query(MinerMetadata).all()
1996        }
1997        df = pd.DataFrame(metadata).T.reset_index()
1998        df.columns = ["uid", "tier", "elo_rating"]
1999        logger.info("Metadata:\n" + df.to_markdown())
2000
2001    async def report_metadata(self):
2002        """Report metadata to validator server."""
2003        metadata = {
2004            m.uid: {"tier": m.tier, "elo_rating": m.score * 100}
2005            for m in self.session.query(MinerMetadata).all()
2006        }
2007        await self.report(metadata, "api/report-metadata")
2008
2009    async def report(self, payload: dict, endpoint: str):
2010        """
2011        Send signed report to validator server.
2012
2013        Args:
2014            payload: Data to report
2015            endpoint: API endpoint path
2016        """
2017        url = f"{self.config.validator.report_url}/{endpoint}"
2018        nonce = str(time.time_ns())
2019        signature = f"0x{self.dendrite.keypair.sign(nonce).hex()}"
2020
2021        headers = {
2022            "Content-Type": "application/json",
2023            "message": nonce,
2024            "ss58_address": self.wallet.hotkey.ss58_address,
2025            "signature": signature,
2026        }
2027
2028        async with httpx.AsyncClient() as client:
2029            response = await client.post(
2030                url,
2031                json=payload,
2032                headers=headers,
2033                timeout=32,
2034            )
2035
2036        if response.status_code != 200:
2037            logger.error(f"Failed to report to {endpoint}. Response: {response.text}")
2038        else:
2039            logger.info(f"Reported to {endpoint}.")
2040
2041    async def _update_metadata(self):
2042        """Update metadata by querying miner status."""
2043        synapse = Metadata()
2044        uids = list(range(len(self.metagraph.hotkeys)))
2045        axons = [self.metagraph.axons[uid] for uid in uids]
2046
2047        responses = await self.dendrite.forward(
2048            axons,
2049            synapse,
2050            deserialize=False,
2051            timeout=16,
2052        )
2053
2054        for uid, response in zip(uids, responses):
2055            miner = self.session.query(MinerMetadata).get(uid)
2056            if not miner:
2057                miner = MinerMetadata(uid=uid)
2058                self.session.add(miner)
2059
2060            current_tier = miner.tier
2061            new_tier = current_tier
2062
2063            if response and response.metadata.get("tier") is not None:
2064                new_tier = response.metadata["tier"]
2065
2066            if new_tier != current_tier:
2067                logger.info(
2068                    f"Tier of uid {uid} changed from {current_tier} to {new_tier}"
2069                )
2070                miner.score = 0
2071
2072            miner.tier = new_tier
2073
2074        self.session.commit()
2075        logger.info(f"Updated metadata for {len(uids)} uids")
2076
2077    def get_rate_limit_per_tier(self):
2078        """Get request rate limits for each tier."""
2079        return {
2080            tier: build_rate_limit(self.metagraph, self.config, tier)[self.uid]
2081            for tier in constants.TIER_CONFIG
2082        }
2083
2084    def _create_serving_counter(self):
2085        """
2086        Create rate limiters for each miner by tier.
2087
2088        Returns:
2089            dict: Nested dict of tier -> uid -> counter
2090        """
2091        counters = {tier: {} for tier in constants.TIER_CONFIG}
2092
2093        for miner in self.session.query(MinerMetadata).all():
2094            tier = miner.tier
2095            if tier not in constants.TIER_CONFIG:
2096                continue
2097
2098            counter = ServingCounter(
2099                self.rate_limit_per_tier[tier], miner.uid, tier, self.redis_client
2100            )
2101            counters[tier][miner.uid] = counter
2102
2103        return counters
2104
2105
2106
2107---
2108File: /neural_condense_core/validator_utils/managing/utils.py
2109---
2110
2111import numpy as np
2112from ...constants import constants
2113from ...logger import logger
2114
2115
2116def apply_top_percentage_threshold(
2117    scores: list[float], uids: list[int], top_percentage: float
2118) -> np.ndarray:
2119    """Apply threshold to keep only top percentage of scores."""
2120    n_top = max(1, int(len(scores) * top_percentage))
2121    top_miners = sorted(zip(uids, scores), key=lambda x: x[1], reverse=True)[:n_top]
2122    top_uids = {uid for uid, _ in top_miners}
2123
2124    return np.array(
2125        [score if uid in top_uids else 0 for uid, score in zip(uids, scores)]
2126    )
2127
2128
2129def standardize_scores(scores: np.ndarray, tier: str) -> np.ndarray:
2130    """Standardize non-zero scores using mean and clamped standard deviation."""
2131    nonzero = scores > 0
2132    if not np.any(nonzero):
2133        return scores
2134
2135    curr_std = np.std(scores[nonzero])
2136    curr_mean = np.mean(scores[nonzero])
2137
2138    if curr_std > 0:
2139        target_std = min(curr_std, constants.EXPECTED_MAX_STD_SCORE)
2140        scale = target_std / curr_std
2141
2142        centered = scores[nonzero] - curr_mean
2143        scaled = centered * scale
2144        compressed = np.tanh(scaled * 0.5) * target_std
2145        scores[nonzero] = compressed + constants.EXPECTED_MEAN_SCORE
2146
2147        logger.info(
2148            "adjust_ratings",
2149            tier=tier,
2150            mean=curr_mean,
2151            std=curr_std,
2152            scale_factor=scale,
2153        )
2154
2155    return scores
2156
2157
2158def normalize_and_weight_scores(scores: np.ndarray, tier: str) -> np.ndarray:
2159    """Normalize scores to sum to 1 and apply tier incentive weighting."""
2160    total = np.sum(scores)
2161    if total > 0:
2162        scores = scores / total
2163
2164    # --Smoothing Update---
2165    from datetime import datetime, timezone
2166
2167    current_datetime = datetime.now(timezone.utc)
2168    target_datetime = datetime(2025, 1, 24, 12, 0, 0, tzinfo=timezone.utc)
2169
2170    if current_datetime < target_datetime:
2171        logger.info("Using early incentive scaling")
2172        if tier == "research":
2173            scale = 0.9
2174        elif tier == "universal":
2175            scale = 0.1
2176    else:
2177        logger.info("Using stable incentive scaling")
2178        scale = constants.TIER_CONFIG[tier].incentive_percentage
2179
2180    return scores * scale
2181
2182
2183
2184---
2185File: /neural_condense_core/validator_utils/monetize/__init__.py
2186---
2187
2188from .organic_gate import OrganicGate
2189
2190__all__ = [
2191    "OrganicGate",
2192]
2193
2194
2195
2196---
2197File: /neural_condense_core/validator_utils/monetize/organic_gate.py
2198---
2199
2200from fastapi import FastAPI, Depends, Request
2201from fastapi.exceptions import HTTPException
2202import pydantic
2203import asyncio
2204import bittensor as bt
2205import uvicorn
2206from concurrent.futures import ThreadPoolExecutor
2207import random
2208import httpx
2209import time
2210from ...constants import constants
2211from ...protocol import TextCompressProtocol
2212from ..managing import MinerManager
2213from ...logger import logger
2214
2215
2216class OrganicPayload(pydantic.BaseModel):
2217    context: str
2218    tier: str
2219    target_model: str
2220    miner_uid: int = -1
2221    top_incentive: float = 0.9
2222
2223
2224class OrganicResponse(pydantic.BaseModel):
2225    compressed_kv_url: str
2226    miner_uid: int = -1
2227    compressed_context: str
2228
2229
2230class RegisterPayload(pydantic.BaseModel):
2231    port: int
2232
2233
2234class OrganicGate:
2235    def __init__(
2236        self,
2237        miner_manager: MinerManager,
2238        config: bt.config,
2239    ):
2240        self.metagraph: bt.metagraph.__class__ = miner_manager.metagraph
2241        self.miner_manager = miner_manager
2242        self.wallet = miner_manager.wallet
2243        self.config = config
2244        self.dendrite = bt.dendrite(wallet=miner_manager.wallet)
2245        self.app = FastAPI()
2246        self.app.add_api_route(
2247            "/forward",
2248            self.forward,
2249            methods=["POST"],
2250            dependencies=[Depends(self.get_self)],
2251        )
2252        self.app.add_api_route(
2253            "/health",
2254            self.health_check,
2255            methods=["GET"],
2256            dependencies=[Depends(self.get_self)],
2257        )
2258        self.client_axon: bt.AxonInfo = None
2259        self.authentication_key = "".join(random.choices("0123456789abcdef", k=16))
2260
2261    async def _run_function_periodically(self, function, interval):
2262        while True:
2263            logger.info(
2264                f"Running function {function.__name__} every {interval} seconds."
2265            )
2266            try:
2267                await function()
2268            except Exception as e:
2269                logger.error(f"Error running function {function.__name__}: {e}")
2270            await asyncio.sleep(interval)
2271
2272    async def register_to_client(self):
2273        logger.info("Registering to client.")
2274        payload = RegisterPayload(port=self.config.validator.gate_port)
2275        logger.info(f"Payload: {payload}")
2276        try:
2277            response = await self.call(payload, timeout=12)
2278            logger.info(f"Registration response: {response}")
2279        except Exception as e:
2280            logger.error(f"Error during registration: {e}")
2281
2282    async def _authenticate(self, request: Request):
2283        message = request.headers["message"]
2284        if message != self.authentication_key:
2285            raise Exception("Authentication failed.")
2286
2287    async def forward(self, request: Request):
2288        try:
2289            await self._authenticate(request)
2290            logger.info("Forwarding organic request.")
2291            request: OrganicPayload = OrganicPayload(**await request.json())
2292            synapse = TextCompressProtocol(
2293                context=request.context,
2294                target_model=request.target_model,
2295            )
2296            logger.info(f"Context: {request.context[:100]}...")
2297            logger.info(f"Tier: {request.tier}")
2298            logger.info(f"Target model: {request.target_model}")
2299            targeted_uid = None
2300            if request.miner_uid != -1:
2301                counter = self.miner_manager.serving_counter[request.tier][
2302                    request.miner_uid
2303                ]
2304                if counter.increment():
2305                    targeted_uid = request.miner_uid
2306                else:
2307                    logger.warning(f"Miner {request.miner_uid} is under rate limit.")
2308                    return HTTPException(
2309                        status_code=503,
2310                        detail="Miner is under rate limit.",
2311                    )
2312            else:
2313                metadata = self.miner_manager.get_metadata()
2314                tier_miners = [
2315                    (uid, metadata.score)
2316                    for uid, metadata in metadata.items()
2317                    if metadata.tier == request.tier
2318                ]
2319                tier_miners.sort(key=lambda x: x[1], reverse=True)
2320
2321                # Try top miners until we find one under rate limit
2322                top_k = max(1, int(len(tier_miners) * request.top_incentive))
2323                top_miners = tier_miners[:top_k]
2324                random.shuffle(top_miners)  # Randomize among top performers
2325                logger.info(f"Top {top_k} miners: {top_miners}")
2326
2327                for uid, _ in top_miners:
2328                    if uid in self.miner_manager.serving_counter[request.tier]:
2329                        counter = self.miner_manager.serving_counter[request.tier][uid]
2330                        if counter.increment():
2331                            targeted_uid = uid
2332                            break
2333
2334            if targeted_uid is None:
2335                raise HTTPException(
2336                    status_code=503,
2337                    detail="No miners available.",
2338                )
2339            target_axon = self.metagraph.axons[targeted_uid]
2340            response: TextCompressProtocol = await self.dendrite.forward(
2341                axons=target_axon,
2342                synapse=synapse,
2343                timeout=constants.TIER_CONFIG[request.tier].timeout,
2344                deserialize=False,
2345            )
2346            # asyncio.create_task(self._organic_validating(response, request.tier))
2347            logger.info(
2348                f"Compressed to url: {response.compressed_kv_url}. Process time: {response.dendrite.process_time}"
2349            )
2350        except Exception as e:
2351            logger.error(f"Error: {e}")
2352            raise HTTPException(status_code=503, detail="Validator error.")
2353
2354        return OrganicResponse(
2355            compressed_kv_url=response.compressed_kv_url, miner_uid=targeted_uid, compressed_context=response.compressed_context
2356        )
2357
2358    async def _organic_validating(self, response, tier):
2359        if random.random() < constants.ORGANIC_VERIFY_FREQUENCY:
2360            is_valid, reason = await TextCompressProtocol.verify(
2361                response, constants.TIER_CONFIG[tier]
2362            )
2363
2364            if not is_valid:
2365                logger.warning(f"Invalid response: {reason}")
2366
2367            # TODO: Update miner's score
2368
2369    def start_server(self):
2370        self.executor = ThreadPoolExecutor(max_workers=1)
2371
2372        async def startup():
2373            config = uvicorn.Config(
2374                self.app,
2375                host="0.0.0.0",
2376                port=self.config.validator.gate_port,
2377                loop="asyncio",
2378            )
2379            server = uvicorn.Server(config)
2380            await server.serve()
2381
2382        async def run_all():
2383            registration_task = self._run_function_periodically(
2384                self.register_to_client, 60
2385            )
2386            server_task = startup()
2387            await asyncio.gather(registration_task, server_task)
2388
2389        asyncio.run(run_all())
2390
2391    async def get_self(self):
2392        return self
2393
2394    async def health_check(self):
2395        return {"status": "healthy"}
2396
2397    async def call(
2398        self,
2399        payload: RegisterPayload,
2400        timeout: float = 12.0,
2401    ) -> bt.Synapse:
2402        """
2403        Customized call method to send Synapse-like requests to the Organic Client Server.
2404
2405        Args:
2406            dendrite (bt.Dendrite): The Dendrite object to send the request.
2407            url (str): The URL of the Organic Client Server.
2408            payload (pydantic.BaseModel): The payload to send in the request.
2409            timeout (float, optional): Maximum duration to wait for a response from the Axon in seconds. Defaults to ``12.0``.
2410
2411        Returns:
2412
2413        """
2414
2415        url = f"{self.config.validator.organic_client_url}/register"
2416        nonce = str(time.time_ns())
2417        message = self.authentication_key + ":" + nonce
2418        signature = f"0x{self.dendrite.keypair.sign(message).hex()}"
2419
2420        headers = {
2421            "Content-Type": "application/json",
2422            "message": message,
2423            "ss58_address": self.wallet.hotkey.ss58_address,
2424            "signature": signature,
2425        }
2426
2427        async with httpx.AsyncClient() as client:
2428            response = await client.post(
2429                url,
2430                json=payload.model_dump(),
2431                headers=headers,
2432                timeout=timeout,
2433            )
2434
2435        if response.status_code != 200:
2436            logger.error(
2437                f"Failed to register to the Organic Client Server. Response: {response.text}"
2438            )
2439            return
2440        else:
2441            logger.info("Registered to the Organic Client Server.")
2442            return response.json()
2443
2444
2445
2446---
2447File: /neural_condense_core/validator_utils/synthesizing/custom_dataset_loaders/__init__.py
2448---
2449
2450from typing import List
2451from .context_loader import load_wikipedia_science_dataset
2452from .infinity_iterable_dataset import InfiniteDataset
2453
2454
2455def load_context_datasets() -> List[InfiniteDataset]:
2456    return [
2457        InfiniteDataset(load_wikipedia_science_dataset().shuffle(seed=42)),
2458    ]
2459
2460
2461
2462---
2463File: /neural_condense_core/validator_utils/synthesizing/custom_dataset_loaders/context_loader.py
2464---
2465
2466from datasets import load_dataset
2467
2468
2469def load_wikipedia_science_dataset():
2470    ds = load_dataset(
2471        "Laz4rz/wikipedia_science_chunked_small_rag_512", streaming=True, split="train"
2472    )
2473    ds = ds.shuffle()
2474    ds = ds.filter(lambda x: len(x["text"]) > 512)
2475    ds = ds.map(lambda x: {"context": x["text"]})
2476    print("Loaded wikipedia science dataset")
2477    return ds
2478
2479
2480
2481---
2482File: /neural_condense_core/validator_utils/synthesizing/custom_dataset_loaders/infinity_iterable_dataset.py
2483---
2484
2485from typing import Iterator
2486from datasets import IterableDataset
2487
2488
2489class InfiniteDataset:
2490    def __init__(self, dataset: IterableDataset):
2491        """
2492        Initialize the InfiniteDataset wrapper.
2493
2494        Args:
2495            dataset (IterableDataset): The dataset object to wrap. It should be an iterable dataset.
2496        """
2497        self.dataset = dataset
2498        self.iterator = iter(self.dataset)  # Initialize the iterator
2499
2500    def __iter__(self) -> Iterator:
2501        """
2502        Return the iterator for the dataset.
2503
2504        Returns:
2505            Iterator: An iterator over the dataset.
2506        """
2507        return self
2508
2509    def __next__(self):
2510        """
2511        Get the next item in the dataset. Automatically reinitialize the iterator if the end is reached.
2512
2513        Returns:
2514            The next item in the dataset.
2515        """
2516        try:
2517            return next(self.iterator)
2518        except StopIteration:
2519            # Reinitialize iterator if the end is reached
2520            self.iterator = iter(self.dataset)
2521            return next(self.iterator)  # Return the first item of the new iterator
2522
2523
2524
2525---
2526File: /neural_condense_core/validator_utils/synthesizing/custom_dataset_loaders/instruction_loader.py
2527---
2528
2529from datasets import load_dataset, IterableDataset
2530
2531
2532def load_orca_instruct_dataset() -> IterableDataset:
2533    ds = load_dataset(
2534        "mlabonne/orca-agentinstruct-1M-v1-cleaned", split="train", streaming=True
2535    )
2536    ds = ds.map(
2537        lambda x: {
2538            "messages": x["messages"],
2539        }
2540    )
2541    return ds
2542
2543
2544def load_open_math_instruct_dataset() -> IterableDataset:
2545    ds = load_dataset("nvidia/OpenMathInstruct-2", split="train", streaming=True)
2546    ds = ds.map(
2547        lambda x: {
2548            "messages": [
2549                {"role": "user", "content": x["problem"]},
2550                {"role": "assistant", "content": x["generated_solution"]},
2551            ]
2552        }
2553    )
2554    return ds
2555
2556
2557
2558---
2559File: /neural_condense_core/validator_utils/synthesizing/__init__.py
2560---
2561
2562from .challenge_generator import ChallengeGenerator
2563
2564__all__ = [
2565    "ChallengeGenerator",
2566]
2567
2568
2569
2570---
2571File: /neural_condense_core/validator_utils/synthesizing/challenge_generator.py
2572---
2573
2574from transformers import AutoTokenizer
2575import re
2576import substrateinterface as st
2577from .scheduler import Scheduler
2578from .convo_generator import ConvoGenerator
2579from .schemas import QASchedulerConfig
2580import random
2581import os
2582from typing import Tuple, List
2583from ...protocol import TextCompressProtocol
2584from .filter_chunker import FilterExistanceChecker
2585from ...constants import constants, ChatTemplate
2586from .utils import retry
2587
2588CORCEL_API_KEY = os.getenv("CORCEL_API_KEY")
2589CORCEL_BASE_URL = os.getenv(
2590    "CORCEL_BASE_URL", "https://api.corcel.io/v1/text/vision/chat"
2591)
2592GENERATOR_MODEL_ID = os.getenv("GENERATOR_MODEL_ID", "llama-3-1-8b")
2593
2594
2595class ChallengeGenerator:
2596    def __init__(self, keypair: st.Keypair):
2597        """
2598        Initialize the ChallengeGenerator class with various dataset loaders and configuration tokens.
2599        """
2600        self.generator = ConvoGenerator(keypair=keypair)
2601        self.synthesizer = Scheduler(
2602            generator=self.generator,
2603            qa_config=QASchedulerConfig(n_qa_per_context=4, max_items=100),
2604            refresh_time=60.0,
2605        )
2606        self.synthesizer.start()
2607        self.start_activation_token = "<START-ACTIVATE-TOKEN>"
2608        self.end_activation_token = "<END-ACTIVATE-TOKEN>"
2609        self.task_to_builder = {
2610            "question_answering": self._build_qa_conversation,
2611        }
2612        self.filter_checker = FilterExistanceChecker(chunk_size=256)
2613
2614    @retry(max_attempts=3)
2615    async def generate_challenge(
2616        self,
2617        model_name: str,
2618        tier: str,
2619        task: str = "question_answering",
2620        max_context_length_in_chars: int = 10000,
2621    ) -> TextCompressProtocol:
2622        if tier == "universal":
2623            chat_template = None
2624        else:
2625            chat_template = constants.CHAT_TEMPLATES[model_name.split("/")[-1]]
2626        try:
2627            context, challenge_question, challenge_answer = await self.task_to_builder[
2628                task
2629            ](max_context_length_in_chars)
2630            positive_chunks, negative_chunks = self.filter_checker.get_chunks(context)
2631            synapse = self._build_protocol(
2632                chat_template,
2633                context,
2634                challenge_question,
2635                challenge_answer,
2636                positive_chunks,
2637                negative_chunks,
2638            )
2639        except Exception as e:
2640            raise e
2641        return synapse
2642
2643    @retry(max_attempts=3)
2644    async def _build_qa_conversation(self, max_chars: int) -> Tuple[str, str, str]:
2645        context_qa_items = await self.synthesizer.get_qas(n=50)
2646        context = ""
2647        question_answer_pairs = []
2648        for qa_item in context_qa_items:
2649            if len(context) + len(qa_item.context_seed) > max_chars:
2650                continue
2651            context += f"\n{qa_item.context_seed}"
2652            questions = qa_item.questions
2653            answers = qa_item.answers
2654            question_answer_pairs.extend(list(zip(questions, answers)))
2655        random.shuffle(question_answer_pairs)
2656        challenge_qa_pairs = random.sample(question_answer_pairs, 5)
2657        challenge_questions = [qa_pair[0] for qa_pair in challenge_qa_pairs]
2658        challenge_answers = [qa_pair[1] for qa_pair in challenge_qa_pairs]
2659
2660        return context, challenge_questions, challenge_answers
2661
2662    def _build_protocol(
2663        self,
2664        chat_template: ChatTemplate,
2665        context: str,
2666        challenge_questions: List[str],
2667        challenge_answers: List[str],
2668        positive_chunks: List[str],
2669        negative_chunks: List[str],
2670    ) -> TextCompressProtocol:
2671        if chat_template is None:
2672            formatted_context = context
2673            formatted_questions = challenge_questions
2674        else:
2675            formatted_context = chat_template.apply_context_template(context)
2676            formatted_questions = [
2677                chat_template.apply_question_template(question)
2678                for question in challenge_questions
2679            ]
2680
2681        return TextCompressProtocol.model_validate(
2682            {
2683                "task_data": {
2684                    "original_context": context,
2685                    "challenge_questions": challenge_questions,
2686                    "challenge_answers": challenge_answers,
2687                    "formatted_questions": formatted_questions,
2688                    "positive_chunks": positive_chunks,
2689                    "formatted_context": formatted_context,
2690                    "negative_chunks": negative_chunks,
2691                },
2692                "context": formatted_context,
2693            }
2694        )
2695
2696
2697
2698---
2699File: /neural_condense_core/validator_utils/synthesizing/convo_generator.py
2700---
2701
2702from typing import Dict, List, Optional
2703import httpx
2704import substrateinterface as st
2705import time
2706import random
2707
2708
2709class ConvoGenerator:
2710    def __init__(
2711        self,
2712        keypair: st.Keypair,
2713    ):
2714        self.model_id = "chat-llama-3-1-8b"
2715        self.model_ids = [
2716            "chat-llama-3-1-8b",
2717            "chat-llama-3-1-70b",
2718        ]
2719        self.url = "https://api.nineteen.ai/v1/chat/completions"
2720        self.keypair = keypair
2721        self.client = httpx.AsyncClient()
2722
2723    def _get_headers(self):
2724        nonce = str(time.time_ns())
2725        signature = f"0x{self.keypair.sign(nonce).hex()}"
2726        return {
2727            "validator-hotkey": self.keypair.ss58_address,
2728            "signature": signature,
2729            "nonce": nonce,
2730            "netuid": "47",
2731            "Content-Type": "application/json",
2732        }
2733
2734    def _get_assistant_messages(self, messages, n_few_shots):
2735        a_messages = messages[n_few_shots:]  # Skip few shots
2736        for i in range(len(a_messages)):
2737            if a_messages[i]["role"] == "assistant":
2738                a_messages[i]["role"] = "user"
2739            else:
2740                a_messages[i]["role"] = "assistant"
2741        return a_messages
2742
2743    async def _make_api_call(self, messages, sampling_params):
2744        payload = sampling_params | {
2745            "model": random.choice(self.model_ids),
2746            "messages": messages,
2747        }
2748        response = await self.client.post(
2749            self.url, json=payload, headers=self._get_headers(), timeout=32
2750        )
2751        if response.status_code != 200:
2752            raise Exception(f"Nineteen API Error: {response.text}")
2753        data = response.json()
2754        content = data["choices"][0]["message"]["content"]
2755        return content
2756
2757    async def generate_conversation(
2758        self,
2759        messages_seed: Optional[List[Dict[str, str]]] = None,
2760        max_turns: int = 4,
2761    ):
2762        assert (
2763            messages_seed[0]["role"] == "user"
2764            and messages_seed[-1]["role"] == "assistant"
2765        ), "First and last message must be a user and assistant message respectively"
2766        assert (
2767            len(messages_seed) % 2 == 0
2768        ), "messages_seed must have an even number of messages"
2769
2770        reversed_messages_seed = []
2771        role_switcher = {"user": "assistant", "assistant": "user"}
2772        for message in messages_seed:
2773            content = message["content"]
2774            role = message["role"]
2775            reversed_messages_seed.append(
2776                {"role": role_switcher[role], "content": content}
2777            )
2778        assert max_turns % 2 == 0, "max_turns must be even"
2779        messages = [
2780            {
2781                "role": "user",
2782                "content": (
2783                    "Your task is to act as a human and questioning on me. "
2784                    "You can ask me anything, I will give you the answer."
2785                    "You have to talk like a human, concisely and dont show emotion."
2786                ),
2787            },
2788            {
2789                "role": "assistant",
2790                "content": "Sure, we will start with a simple question: 1+1=?",
2791            },
2792            {
2793                "role": "user",
2794                "content": "2",
2795            },
2796        ]
2797        n_few_shots = len(messages)
2798        messages.extend(reversed_messages_seed)
2799        sampling_params = {"temperature": 0.4, "max_tokens": 1024, "stream": False}
2800        # Get first response
2801        text = await self._make_api_call(messages, sampling_params)
2802        messages.append({"role": "assistant", "content": text})
2803
2804        # Generate multiple conversation turns
2805        assistant_messages = self._get_assistant_messages(messages, n_few_shots)
2806        for i in range(max_turns):
2807            # CALL ASSISTANT-MESSAGES -> ASSISTANT-MESSAGES
2808            text = await self._make_api_call(assistant_messages, sampling_params)
2809            assistant_messages.append({"role": "assistant", "content": text})
2810            messages.append({"role": "user", "content": text})
2811            if i == max_turns - 1:
2812                break
2813            # CALL MESSAGES -> FAKE--MESSAGES
2814            text = await self._make_api_call(messages, sampling_params)
2815            assistant_messages.append({"role": "user", "content": text})
2816            messages.append({"role": "assistant", "content": text})
2817
2818        total_chars = 0
2819        for i in range(len(assistant_messages)):
2820            total_chars += len(assistant_messages[i]["content"])
2821        return assistant_messages, total_chars
2822
2823    async def generate_qa_pairs(self, context_seed: str, num_questions: int = 1):
2824        prompt = f"""
2825You are an agent that generates questions from provided text.
2826Instructions:
2827- For provided text, generate {num_questions} questions that can be answered solely by the facts in the text.
2828- Return questions in a list format. Example: ["Question 1", "Question 2", "Question 3"]
2829
2830### Context ###
2831```
2832{context_seed}
2833```
2834"""
2835        messages = [
2836            {"role": "user", "content": prompt},
2837        ]
2838        sampling_params = {
2839            "temperature": 1.0,
2840            "stop": ["?"],
2841            "max_tokens": 512,
2842            "stream": False,
2843        }
2844        text = await self._make_api_call(messages, sampling_params)
2845        questions = self.extract_questions(text)
2846        if not questions:
2847            print(text)
2848        answers = []
2849        for question in questions:
2850            sampling_params = {"temperature": 0.4, "max_tokens": 1024, "stream": False}
2851            text = await self._make_api_call(
2852                [
2853                    {
2854                        "role": "user",
2855                        "content": f"{context_seed}\n\nBased on above context, answer the question concisely: {question}",
2856                    }
2857                ],
2858                sampling_params,
2859            )
2860            answers.append(text)
2861        total_chars = len(context_seed)
2862        for q, a in zip(questions, answers):
2863            total_chars += len(q) + len(a)
2864        return questions, answers, total_chars
2865
2866    def extract_questions(self, completion: str):
2867        # Extract based on format ["Question 1", "Question 2", "Question 3"]
2868        start_list = completion.find("[")
2869        end_list = completion.find("]")
2870        questions = completion[start_list + 1 : end_list]
2871        questions = eval(questions)
2872        return questions
2873
2874
2875
2876---
2877File: /neural_condense_core/validator_utils/synthesizing/filter_chunker.py
2878---
2879
2880from datasets import load_dataset
2881from typing import Tuple
2882import random
2883from semantic_text_splitter import TextSplitter
2884
2885
2886class FilterExistanceChecker:
2887    def __init__(self, chunk_size: int = 256):
2888        self.splitter = TextSplitter(chunk_size)
2889        self.negative_dataset = self._load_negative_dataset()
2890
2891    def _load_negative_dataset(self):
2892        negative_dataset = load_dataset(
2893            "TIGER-Lab/Fineweb-Instruct", streaming=True, split="train"
2894        )
2895        negative_dataset = negative_dataset.shuffle()
2896        negative_dataset = negative_dataset.filter(lambda x: len(x["response"]) > 1024)
2897        negative_dataset = negative_dataset.map(lambda x: {"text": x["response"]})
2898        negative_dataset = iter(negative_dataset)
2899        return negative_dataset
2900
2901    def _get_negative_message(self):
2902        try:
2903            return next(self.negative_dataset)["text"]
2904        except StopIteration:
2905            self.negative_dataset = self._load_negative_dataset()
2906            return self._get_negative_message()
2907
2908    def get_chunks(self, context: str) -> Tuple[str, str]:
2909        # Test on positive case (text from conversation)
2910        chunks = self.splitter.chunks(context)
2911        # Get random 2 chunks
2912        positive_chunks = random.sample(chunks, 2)
2913        # Test on negative case (text not from conversation)
2914        negative_chunks = random.sample(
2915            self.splitter.chunks(self._get_negative_message()), 2
2916        )
2917        return positive_chunks, negative_chunks
2918
2919
2920
2921---
2922File: /neural_condense_core/validator_utils/synthesizing/scheduler.py
2923---
2924
2925import redis
2926from typing import Optional, List
2927import random
2928from datasets import load_dataset
2929from .convo_generator import ConvoGenerator
2930from .custom_dataset_loaders import load_context_datasets
2931from .schemas import (
2932    QASet,
2933    QASchedulerConfig,
2934)
2935from ...constants import constants
2936import time
2937from ...logger import logger
2938import asyncio
2939
2940
2941class Scheduler:
2942    def __init__(
2943        self,
2944        generator: ConvoGenerator,
2945        qa_config: QASchedulerConfig,
2946        refresh_time: float = 10.0,
2947    ):
2948        self.generator = generator
2949        self.qa_config = qa_config
2950        self.refresh_time = refresh_time
2951        self.context_datasets = load_context_datasets()
2952        redis_config = constants.DATABASE_CONFIG.redis
2953        self.redis = redis.Redis(
2954            host=redis_config.host,
2955            port=redis_config.port,
2956            db=redis_config.db,
2957            decode_responses=True,
2958        )
2959        self.qa_key = "qa_sets"
2960        self._prefill_queues()
2961        self.running = False
2962        self.loop = asyncio.get_event_loop()
2963
2964    def _prefill_queues(self):
2965        self.redis.delete(self.qa_key)
2966        cached_qa_ds = load_dataset(
2967            "Condense-AI/subnet-synthetic-dataset-v0.2", name="QA", split="train"
2968        )
2969        for qa_set in cached_qa_ds:
2970            item = QASet(**qa_set)
2971            self.redis.sadd(self.qa_key, item.model_dump_json())
2972
2973        logger.info(f"β
 Prefilled QA: {self.redis.scard(self.qa_key)} items.")
2974
2975    def _get_next_context_seed(self):
2976        ds = random.choice(self.context_datasets)
2977        item = next(ds)
2978        return item["context"]
2979
2980    async def _refresh_qa_queue(self):
2981        while self.running:
2982            if self.redis.scard(self.qa_key) < self.qa_config.max_items:
2983                try:
2984                    context_seed = self._get_next_context_seed()
2985                    (
2986                        questions,
2987                        answers,
2988                        total_chars,
2989                    ) = await self.generator.generate_qa_pairs(
2990                        context_seed, num_questions=self.qa_config.n_qa_per_context
2991                    )
2992                    qa_set = QASet(
2993                        questions=questions,
2994                        answers=answers,
2995                        total_chars=total_chars,
2996                        context_seed=context_seed,
2997                    )
2998                    self.redis.sadd(self.qa_key, qa_set.model_dump_json())
2999                    current_time = time.strftime("%H:%M:%S")
3000                    logger.info(
3001                        f"β
 QA Set: {self.redis.scard(self.qa_key)} - last_time: {current_time} - {total_chars} chars"
3002                    )
3003                except Exception as e:
3004                    logger.warning(f"β Error generating QA set: {e}")
3005            else:
3006                self.redis.spop(self.qa_key)
3007            await asyncio.sleep(self.refresh_time)
3008
3009    def start(self):
3010        self.running = True
3011        self.loop.create_task(self._refresh_qa_queue())
3012
3013    def stop(self):
3014        self.running = False
3015
3016    async def get_qas(self, n: int = 1) -> Optional[List[QASet]]:
3017        items = self.redis.srandmember(self.qa_key, n)
3018        return [QASet.model_validate_json(item) for item in items] if items else None
3019
3020
3021
3022---
3023File: /neural_condense_core/validator_utils/synthesizing/schemas.py
3024---
3025
3026from pydantic import BaseModel, Field
3027from typing import List
3028from pydantic import validator
3029
3030
3031class QASchedulerConfig(BaseModel):
3032    n_qa_per_context: int = Field(
3033        ..., description="Number of QA pairs to generate per context"
3034    )
3035    max_items: int = Field(..., description="Maximum number of items to generate")
3036
3037
3038class ConversationSchedulerConfig(BaseModel):
3039    n_new_conversations: int = Field(
3040        ..., description="Number of new conversations to generate"
3041    )
3042    n_previous_conversations: int = Field(
3043        ..., description="Number of previous conversations to keep from the dataset"
3044    )
3045    max_items: int = Field(..., description="Maximum number of items to generate")
3046
3047
3048class Message(BaseModel):
3049    role: str = Field(..., description="Role of the message sender (user or assistant)")
3050    content: str = Field(..., description="Content of the message")
3051
3052
3053class QASet(BaseModel):
3054    questions: List[str] = Field(..., description="List of generated questions")
3055    answers: List[str] = Field(..., description="List of corresponding answers")
3056    total_chars: int = Field(
3057        ..., description="Total character count of questions and answers"
3058    )
3059    context_seed: str = Field(
3060        ..., description="Original context used to generate QA pairs"
3061    )
3062
3063    @validator("questions", "answers")
3064    def validate_questions_and_answers(cls, v):
3065        if not v:
3066            raise ValueError("Questions and answers must be non-empty lists")
3067        return v
3068
3069
3070class Conversation(BaseModel):
3071    messages: List[Message] = Field(..., description="List of conversation messages")
3072    total_chars: int = Field(
3073        ..., description="Total character count of the conversation"
3074    )
3075    messages_seed: List[Message] = Field(
3076        ..., description="Original messages used to generate conversation"
3077    )
3078
3079    @validator("messages")
3080    def validate_messages(cls, v):
3081        if not v:
3082            raise ValueError("Messages must be non-empty lists")
3083        return v
3084
3085
3086
3087---
3088File: /neural_condense_core/validator_utils/synthesizing/utils.py
3089---
3090
3091# Define retry decorator
3092def retry(max_attempts=3):
3093    def decorator(func):
3094        def wrapper(*args, **kwargs):
3095            attempts = 0
3096            while attempts < max_attempts:
3097                try:
3098                    return func(*args, **kwargs)
3099                except Exception as e:
3100                    attempts += 1
3101                    if attempts == max_attempts:
3102                        raise e
3103            return None
3104
3105        return wrapper
3106
3107    return decorator
3108
3109
3110
3111---
3112File: /neural_condense_core/validator_utils/__init__.py
3113---
3114
3115from . import loop
3116from . import synthesizing
3117from . import managing
3118from . import monetize
3119
3120__all__ = [
3121    "loop",
3122    "synthesizing",
3123    "managing",
3124    "monetize",
3125]
3126
3127
3128
3129---
3130File: /neural_condense_core/__init__.py
3131---
3132
3133from . import base
3134from . import validator_utils
3135from . import miner_utils
3136from . import protocol
3137from . import common
3138from .constants import constants
3139from .logger import logger
3140
3141__version__ = "0.0.3"
3142version_split = __version__.split(".")
3143__spec_version__ = (
3144    (1000 * int(version_split[0]))
3145    + (10 * int(version_split[1]))
3146    + (1 * int(version_split[2]))
3147)
3148
3149__all__ = [
3150    "base",
3151    "validator_utils",
3152    "miner_utils",
3153    "protocol",
3154    "common",
3155    "constants",
3156    "logger",
3157]
3158
3159
3160
3161---
3162File: /neural_condense_core/constants.py
3163---
3164
3165from pydantic import BaseModel, Field
3166from typing import List, Dict
3167import os
3168
3169
3170class ChatTemplate(BaseModel):
3171    # Example of Mistral-7B-Instruct-v0.2
3172    bos_token: str = "<s>"
3173    eos_token: str = "</s>"
3174    user_start_token: str = "[INST]"
3175    user_end_token: str = "[/INST]"
3176    assistant_start_token: str = ""  # No specific start token for the assistant
3177    assistant_end_token: str = ""  # No specific end token for the assistant
3178
3179    def apply_context_template(self, context: str):
3180        return f"{self.bos_token} {self.user_start_token} {context}"
3181
3182    def apply_question_template(self, question: str):
3183        return f"\n\n{question} {self.user_end_token} {self.assistant_start_token}"
3184
3185
3186class TierConfig(BaseModel):
3187    incentive_percentage: float
3188    requests_per_epoch: int
3189    timeout: int
3190    supporting_models: List[str]
3191    max_condensed_tokens: int
3192    min_condensed_tokens: int
3193    max_compress_rate: float = 1.0
3194    min_compress_rate: float = 0.1
3195    max_context_length_in_chars: int
3196    accelerate_reward_scalar: float
3197
3198
3199class SyntheticTaskConfig(BaseModel):
3200    task: str
3201    criterias: List[str]
3202    rewarding_frequency: int
3203    weight: float
3204
3205
3206class RedisConfig(BaseModel):
3207    """Configuration for Redis connection"""
3208
3209    host: str = Field(default="localhost")
3210    port: int = Field(default=6379)
3211    db: int = Field(default=0)
3212    expire_time: int = Field(
3213        default=3600, description="Default expiration time in seconds"
3214    )
3215    serving_counter_key_format: str = Field(default="serving_counter:{tier}:{uid}")
3216
3217
3218class SqlConfig(BaseModel):
3219    """Configuration for SQL database connection"""
3220
3221    url: str = Field(
3222        default="sqlite:///miner_metadata.db",
3223        description="Database URL in SQLAlchemy format",
3224    )
3225
3226
3227class DatabaseConfig(BaseModel):
3228    """Combined database configuration"""
3229
3230    redis: RedisConfig = Field(default_factory=RedisConfig)
3231    sql: SqlConfig = Field(default_factory=SqlConfig)
3232
3233
3234class Constants(BaseModel):
3235    TIER_CONFIG: dict[str, TierConfig] = {
3236        "research": TierConfig(
3237            incentive_percentage=0.6,
3238            requests_per_epoch=256,
3239            timeout=32,
3240            accelerate_reward_scalar=0.1,
3241            supporting_models=["Condense-AI/Mistral-7B-Instruct-v0.2"],
3242            max_condensed_tokens=1536,
3243            min_condensed_tokens=128,
3244            max_context_length_in_chars=15000,
3245        ),
3246        "universal": TierConfig(
3247            incentive_percentage=0.4,
3248            requests_per_epoch=256,
3249            timeout=16,
3250            accelerate_reward_scalar=0.1,
3251            supporting_models=["unsloth/Meta-Llama-3.1-8B-Instruct"],
3252            max_condensed_tokens=-1,
3253            min_condensed_tokens=-1,
3254            max_context_length_in_chars=15000,
3255            max_compress_rate=0.8,
3256            min_compress_rate=0.1,
3257        ),
3258    }
3259
3260    SYNTHETIC_TASK_CONFIG: List[SyntheticTaskConfig] = [
3261        SyntheticTaskConfig(
3262            task="causal_conversation",
3263            criterias=["perplexity"],
3264            rewarding_frequency=1,
3265            weight=0,
3266        ),
3267        SyntheticTaskConfig(
3268            task="question_answering",
3269            criterias=["accuracy"],
3270            rewarding_frequency=1,
3271            weight=1,
3272        ),
3273        SyntheticTaskConfig(
3274            task="reconstruct_conversation",
3275            criterias=["perplexity"],
3276            rewarding_frequency=1,
3277            weight=0,
3278        ),
3279        SyntheticTaskConfig(
3280            task="trivial_qa_conversation",
3281            criterias=["accuracy"],
3282            rewarding_frequency=1,
3283            weight=0,
3284        ),
3285    ]
3286
3287    # Default values
3288    EPOCH_LENGTH: int = 600
3289    SCORING_PER_MINER_PER_EPOCH: int = 1
3290    SUBNET_TEMPO: int = 360
3291    MIN_STAKE: int = int(os.environ.get("MIN_STAKE", 10000))
3292    RPE_PERCENTAGE_FOR_SYNTHETIC: float = 0.1
3293    BATCH_SIZE: int = 8
3294    SET_WEIGHTS_TIMEOUT: int = 120
3295    ORGANIC_CLIENT_URL: str = "https://ncs-client.condenses.ai"
3296    REPORT_URL: str = "https://report.condenses.ai"
3297    ORGANIC_VERIFY_FREQUENCY: float = 0.1
3298    TOP_PERCENTAGE_FOR_ALLOCATING_WEIGHTS: float = 1.0
3299
3300    DATABASE_CONFIG: DatabaseConfig = Field(
3301        default_factory=lambda: DatabaseConfig(
3302            redis=RedisConfig(
3303                host=os.getenv("REDIS_HOST", "localhost"),
3304                port=int(os.getenv("REDIS_PORT", 6379)),
3305                db=int(os.getenv("REDIS_DB", 0)),
3306                expire_time=int(os.getenv("REDIS_EXPIRE_TIME", 3600)),
3307            ),
3308            sql=SqlConfig(
3309                url=os.getenv("SQL_DATABASE_URL", "sqlite:///miner_metadata.db")
3310            ),
3311        )
3312    )
3313
3314    CHAT_TEMPLATES: Dict[str, ChatTemplate] = {
3315        "Mistral-7B-Instruct-v0.2": ChatTemplate(
3316            bos_token="<s>",
3317            eos_token="</s>",
3318            user_start_token="[INST]",
3319            user_end_token="[/INST]",
3320            assistant_start_token="",
3321            assistant_end_token="",
3322        ),
3323    }
3324
3325    # Adjust values based on NETWORK environment variable
3326    def __init__(self, **data):
3327        super().__init__(**data)
3328        network = os.getenv("NETWORK")
3329        if network == "test":
3330            self.RPE_PERCENTAGE_FOR_SYNTHETIC = float(
3331                os.getenv("RPE_PERCENTAGE_FOR_SYNTHETIC", 0.5)
3332            )
3333            self.EPOCH_LENGTH = int(os.getenv("EPOCH_LENGTH", 600))
3334            self.MIN_STAKE = int(os.getenv("MIN_STAKE", 0))
3335            self.ORGANIC_CLIENT_URL = os.getenv(
3336                "ORGANIC_CLIENT_URL", "https://testnet-ncs-client.condenses.ai"
3337            )
3338            self.REPORT_URL = os.getenv(
3339                "REPORT_URL", "https://testnet-report.condenses.ai"
3340            )
3341
3342
3343constants = Constants()
3344
3345if __name__ == "__main__":
3346    import rich
3347
3348    for k, v in constants.model_dump().items():
3349        rich.print(f"- {k}: {v}")
3350
3351
3352
3353---
3354File: /neural_condense_core/executor.py
3355---
3356
3357from concurrent.futures import ThreadPoolExecutor
3358
3359THREAD_POOL_SIZE: int = 8
3360THREAD_POOL: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)
3361
3362
3363
3364---
3365File: /neural_condense_core/logger.py
3366---
3367
3368import structlog
3369
3370logger = structlog.get_logger()
3371
3372
3373
3374---
3375File: /neural_condense_core/protocol.py
3376---
3377
3378import re
3379from bittensor import Synapse
3380from typing import Any, List
3381import torch
3382from transformers import DynamicCache
3383from pydantic import BaseModel
3384from .common.file import load_npy_from_url
3385from .constants import TierConfig
3386import numpy as np
3387import io
3388from neural_condense_core.logger import logger
3389
3390
3391class Metadata(Synapse):
3392    metadata: dict = {}
3393
3394
3395class TaskData(BaseModel):
3396    formatted_context: str = ""
3397    original_context: str = ""
3398    challenge_questions: List[str] = []
3399    challenge_answers: List[str] = []
3400    formatted_questions: List[str] = []
3401    negative_chunks: List[str] = []
3402    positive_chunks: List[str] = []
3403
3404
3405class UtilData(BaseModel):
3406    compressed_kv_b64: str = ""
3407    compressed_length: int = 0
3408    download_time: float = 0.0
3409    bonus_compress_size: float = 0.0
3410    bonus_time: float = 0.0
3411    local_filename: str = ""
3412
3413
3414class MinerResponse(BaseModel):
3415    filename: str = ""
3416    compressed_context: str = ""
3417
3418
3419class BatchedScoringRequest(BaseModel):
3420    miner_responses: List[MinerResponse] = []
3421    task_data: TaskData = TaskData()
3422    target_model: str = ""
3423    criterias: List[str] = []
3424
3425
3426class TextCompressProtocol(Synapse):
3427    context: str = ""
3428    target_model: str = ""
3429    tier: str = ""
3430    compressed_kv_url: str = ""
3431    compressed_context: str = ""
3432    util_data: UtilData = UtilData()
3433    task_data: TaskData = TaskData()
3434
3435    @property
3436    def accelerate_score(self) -> float:
3437        return (self.util_data.bonus_compress_size + self.bonus_time) / 2
3438
3439    @property
3440    def bonus_time(self) -> float:
3441        if self.tier == "universal":
3442            return 1 - min(1, self.dendrite.process_time / self.timeout)
3443        else:
3444            return 1 - min(
3445                1,
3446                (self.dendrite.process_time + self.util_data.download_time)
3447                / self.timeout,
3448            )
3449
3450    @property
3451    def miner_payload(self) -> dict:
3452        return {"context": self.context, "target_model": self.target_model}
3453
3454    @property
3455    def miner_synapse(self, is_miner: bool = False):
3456        return TextCompressProtocol(
3457            **self.model_dump(include={"context", "target_model"})
3458        )
3459
3460    @property
3461    def validator_payload(self) -> dict:
3462        return {
3463            "task_data": self.task_data.model_dump(),
3464            "util_data": self.util_data.model_dump(),
3465            "tier": self.tier,
3466        }
3467
3468    @staticmethod
3469    def get_scoring_payload(
3470        responses: List["TextCompressProtocol"],
3471        ground_truth_synapse: "TextCompressProtocol",
3472        target_model: str,
3473        criterias: List[str],
3474    ) -> BatchedScoringRequest:
3475        if ground_truth_synapse.tier != "universal":
3476            return BatchedScoringRequest(
3477                miner_responses=[
3478                    {"filename": r.util_data.local_filename} for r in responses
3479                ],
3480                task_data=ground_truth_synapse.task_data,
3481                target_model=target_model,
3482                criterias=criterias,
3483            )
3484        else:
3485            return BatchedScoringRequest(
3486                miner_responses=[
3487                    {"compressed_context": r.compressed_context} for r in responses
3488                ],
3489                task_data=ground_truth_synapse.task_data,
3490                target_model=target_model,
3491                criterias=criterias,
3492            )
3493
3494    @staticmethod
3495    async def verify(
3496        response: "TextCompressProtocol",
3497        tier_config: TierConfig,
3498        tier: str,
3499        tokenizer=None,
3500        ground_truth_synapse: "TextCompressProtocol" = None,
3501    ) -> tuple[bool, str]:
3502        print(f"Verifying tier: {tier}")
3503        if tier == "universal":
3504            condensed_tokens = tokenizer.encode(response.compressed_context)
3505            original_tokens = tokenizer.encode(ground_truth_synapse.context)
3506            n_condensed_tokens = len(condensed_tokens)
3507            compress_rate = n_condensed_tokens / len(original_tokens)
3508            logger.info(f"Compress rate: {compress_rate}")
3509            if not (
3510                tier_config.min_compress_rate
3511                <= compress_rate
3512                <= tier_config.max_compress_rate
3513            ):
3514                return (
3515                    False,
3516                    f"Compressed tokens are not within the expected range. {compress_rate}. Valid range: {tier_config.min_compress_rate} to {tier_config.max_compress_rate}",
3517                )
3518
3519            response.util_data.bonus_compress_size = 1 - compress_rate
3520            response.util_data.compressed_length = n_condensed_tokens
3521            return True, ""
3522        else:
3523            if not re.match(r"^https?://.*\.npy$", response.compressed_kv_url):
3524                return False, "Compressed KV URL must use HTTP or HTTPS."
3525
3526            compressed_kv, filename, download_time, error = await load_npy_from_url(
3527                response.compressed_kv_url
3528            )
3529            response.util_data.download_time = download_time
3530            response.util_data.local_filename = filename
3531            if compressed_kv is None:
3532                return (
3533                    False,
3534                    f"Failed to load url: {error}. {download_time} seconds. {filename}",
3535                )
3536            try:
3537                tensor = torch.from_numpy(compressed_kv)
3538                kv_cache = DynamicCache.from_legacy_cache(tensor)
3539            except Exception as e:
3540                return False, f"{error} -> {str(e)}"
3541
3542            if not (
3543                tier_config.min_condensed_tokens
3544                <= kv_cache._seen_tokens
3545                <= tier_config.max_condensed_tokens
3546            ):
3547                return False, "Compressed tokens are not within the expected range."
3548
3549            response.util_data.bonus_compress_size = 1 - (
3550                kv_cache._seen_tokens / tier_config.max_condensed_tokens
3551            )
3552            response.util_data.compressed_length = kv_cache._seen_tokens
3553            del kv_cache
3554            del compressed_kv
3555            return True, ""
3556
3557
3558
3559---
3560File: /neurons/miner.py
3561---
3562
3563import neural_condense_core as ncc
3564import httpx
3565from typing import Tuple
3566import bittensor as bt
3567import time
3568import traceback
3569import redis
3570
3571
3572class Miner(ncc.base.BaseMiner):
3573    def __init__(self):
3574        super().__init__()
3575        self.blacklist_fns.append(self.blacklist_fn)
3576        self.forward_fns.append(self.forward_text_compress)
3577        self.redis_config = ncc.constants.DATABASE_CONFIG.redis
3578        self.redis = redis.Redis(
3579            host=self.redis_config.host,
3580            port=self.redis_config.port,
3581            db=self.redis_config.db,
3582            decode_responses=True,
3583        )
3584        self.setup_logging()
3585        self._initialize_rate_limits()
3586
3587    def _initialize_rate_limits(self):
3588        r"""
3589        Initializes the rate limits for the miners.
3590        """
3591        self.rate_limits = {
3592            uid: ncc.validator_utils.managing.ServingCounter(
3593                rate_limit=rate_limit,
3594                uid=uid,
3595                tier=self.config.miner.tier,
3596                redis_client=self.redis,
3597                postfix_key=self.config.axon.port,
3598            )
3599            for uid, rate_limit in ncc.common.build_rate_limit(
3600                self.metagraph, self.config, tier=self.config.miner.tier
3601            ).items()
3602        }
3603        for k, v in self.rate_limits.items():
3604            v.reset_counter()
3605            bt.logging.info(
3606                f"Reset rate limit for {k}: {v.get_current_count()}/{v.rate_limit}"
3607            )
3608
3609    def run(self):
3610        self.setup_axon()
3611        bt.logging.info("Starting main loop")
3612        step = 0
3613        while True:
3614            try:
3615                # Periodically update our knowledge of the network graph.
3616                if step % 60 == 0:
3617                    self.metagraph.sync()
3618                    self._initialize_rate_limits()
3619                    log = (
3620                        f"Block: {self.metagraph.block.item()} | "
3621                        f"Incentive: {self.metagraph.I[self.my_subnet_uid]} | "
3622                    )
3623                    bt.logging.info(log)
3624                step += 1
3625                time.sleep(10)
3626
3627            except KeyboardInterrupt:
3628                self.axon.stop()
3629                bt.logging.success("Miner killed by keyboard interrupt.")
3630                break
3631            except Exception as e:
3632                bt.logging.error(f"Miner exception: {e}")
3633                bt.logging.error(traceback.format_exc())
3634                continue
3635
3636    async def forward_text_compress(
3637        self, synapse: ncc.protocol.TextCompressProtocol
3638    ) -> ncc.protocol.TextCompressProtocol:
3639        r"""
3640        Forward function for the Text-Compress task.
3641        Args:
3642            synapse (TextCompressProtocol): The synapse containing the text to compress.
3643        Returns:
3644            synapse (TextCompressProtocol): The synapse containing the compressed tokens.
3645        """
3646        bt.logging.info(
3647            f"Forwarding text compress: {synapse.context[:100]}...{synapse.context[-100:]}"
3648        )
3649        bt.logging.info(f"Context length: {len(synapse.context)}")
3650
3651        payload = synapse.miner_payload
3652
3653        async with httpx.AsyncClient(timeout=synapse.timeout) as client:
3654            response = await client.post(
3655                f"http://{self.config.miner.backend_host}:{self.config.miner.backend_port}/condense",
3656                json=payload,
3657            )
3658            response = response.json()
3659
3660            if self.config.miner.tier == "universal":
3661                synapse.compressed_context = response["compressed_context"]
3662                return synapse
3663
3664            elif self.config.miner.tier == "research":
3665                compressed_kv_url = response["compressed_kv_url"]
3666                bt.logging.info(f"Compressed & uploaded to {compressed_kv_url}")
3667                return ncc.protocol.TextCompressProtocol(
3668                    compressed_kv_url=compressed_kv_url
3669                )
3670
3671    def blacklist_fn(
3672        self, synapse: ncc.protocol.TextCompressProtocol
3673    ) -> Tuple[bool, str]:
3674        r"""
3675        Blacklist function for the Text-Compress task.
3676        Args:
3677            synapse (TextCompressProtocol): The synapse containing the text to compress.
3678        Returns:
3679            bool: Whether to blacklist the synapse.
3680            reason (str): The reason for blacklisting the synapse.
3681        """
3682        hotkey = synapse.dendrite.hotkey
3683        uid = self.metagraph.hotkeys.index(hotkey)
3684        stake = self.metagraph.S[uid]
3685        if stake < ncc.constants.MIN_STAKE:
3686            return True, "Stake too low."
3687        allowed = self.rate_limits[uid].increment()
3688        bt.logging.info(
3689            f"Rate limit: {uid} {self.rate_limits[uid].get_current_count()}/{self.rate_limits[uid].rate_limit}"
3690        )
3691        if not allowed:
3692            return True, "Rate limit exceeded."
3693        return False, ""
3694
3695
3696if __name__ == "__main__":
3697    miner = Miner()
3698    miner.run()
3699
3700
3701
3702---
3703File: /neurons/validator.py
3704---
3705
3706from neural_condense_core import (
3707    base,
3708    validator_utils as vutils,
3709    constants,
3710    logger,
3711)
3712from neural_condense_core.common import clean_tmp_directory
3713from neural_condense_core.protocol import TextCompressProtocol
3714import pandas as pd
3715import bittensor as bt
3716import random
3717from transformers import AutoTokenizer
3718import traceback
3719import asyncio
3720from concurrent.futures import ThreadPoolExecutor
3721import time
3722
3723
3724class Validator(base.BaseValidator):
3725    def __init__(self):
3726        super().__init__()
3727        self.miner_manager = vutils.managing.MinerManager(
3728            uid=self.uid,
3729            wallet=self.wallet,
3730            metagraph=self.metagraph,
3731            config=self.config,
3732        )
3733        self.challenge_generator = vutils.synthesizing.ChallengeGenerator(
3734            keypair=self.dendrite.keypair
3735        )
3736
3737        if self.config.validator.use_wandb:
3738            vutils.loop.initialize_wandb(self.dendrite, self.metagraph, self.uid)
3739
3740        self.set_weights_executor = ThreadPoolExecutor(max_workers=1)
3741        self.metadata_task = None
3742        self.metadata_interval = 600  # 10 minutes in seconds
3743
3744    async def start_epoch(self):
3745        clean_tmp_directory()
3746        logger.info("Running epoch.")
3747        await self.miner_manager.sync()
3748
3749        tasks = [
3750            self.loop.create_task(self._forward_tier(tier))
3751            for tier in constants.TIER_CONFIG
3752        ]
3753        try:
3754            await asyncio.gather(*tasks, return_exceptions=True)
3755        except asyncio.TimeoutError as e:
3756            logger.warning(f"Epoch tasks timed out: {e}")
3757        except Exception as e:
3758            logger.error(f"Error running epoch tasks: {e}")
3759            traceback.print_exc()
3760
3761    async def report_metadata(self):
3762        try:
3763            await self.miner_manager.report_metadata()
3764            logger.info("Reported metadata successfully")
3765        except Exception as e:
3766            logger.error(f"Failed to report metadata: {e}")
3767
3768    async def _forward_tier(self, tier: str):
3769        try:
3770            if constants.TIER_CONFIG[tier].incentive_percentage == 0:
3771                logger.info(f"Tier {tier} has no incentive percentage.")
3772                return
3773            model_name = random.choice(constants.TIER_CONFIG[tier].supporting_models)
3774            tokenizer = AutoTokenizer.from_pretrained(model_name)
3775            serving_counter = self.miner_manager.serving_counter.get(tier, {})
3776
3777            if not serving_counter:
3778                logger.info(f"No miners in tier {tier}.")
3779                return
3780            rate_limit = self.miner_manager.rate_limit_per_tier[tier]
3781            n_sets = max(
3782                int(rate_limit * constants.RPE_PERCENTAGE_FOR_SYNTHETIC),
3783                1,
3784            )
3785            futures = []
3786        except Exception as e:
3787            logger.error(f"Error in _forward_tier: {e}")
3788            traceback.print_exc()
3789            return
3790
3791        task_config = vutils.loop.get_task_config()
3792        model_name = random.choice(constants.TIER_CONFIG[tier].supporting_models)
3793        tokenizer = AutoTokenizer.from_pretrained(model_name)
3794        ground_truth_synapses = [
3795            await vutils.loop.prepare_synapse(
3796                challenge_generator=self.challenge_generator,
3797                tier=tier,
3798                task_config=task_config,
3799                tier_config=constants.TIER_CONFIG[tier],
3800                model_name=model_name,
3801            )
3802            for _ in range(n_sets)
3803        ]
3804
3805        sleep_per_set = constants.EPOCH_LENGTH / n_sets
3806
3807        logger.info(f"Prepared {len(ground_truth_synapses)} ground truth synapses.")
3808
3809        for i, ground_truth_synapse in enumerate(ground_truth_synapses):
3810            logger.info(
3811                f"Processing set {i}/{n_sets} then sleeping for {sleep_per_set} seconds."
3812            )
3813            total_uids = list(serving_counter.keys())
3814            logger.info(f"Total uids: {total_uids}")
3815            random.shuffle(total_uids)
3816            batched_uids = [total_uids[i : i + 4] for i in range(0, len(total_uids), 4)]
3817
3818            for uids in batched_uids:
3819                start_time = time.time()
3820                # Wait if we have too many pending futures
3821                pending_futures = [f for f in futures if not f.done()]
3822                while len(pending_futures) >= 10:
3823                    logger.info(
3824                        f"Waiting for {len(pending_futures)} futures to complete."
3825                    )
3826                    await asyncio.sleep(1)
3827                    # Clean up completed futures
3828                    futures = [f for f in futures if not f.done()]
3829                    pending_futures = [f for f in futures if not f.done()]
3830
3831                logger.info(
3832                    "Processing batch",
3833                    uids=uids,
3834                    sleep=sleep_per_set / len(batched_uids),
3835                )
3836                future = self.loop.create_task(
3837                    self._forward_batch(
3838                        tier,
3839                        model_name,
3840                        uids,
3841                        ground_truth_synapse,
3842                        task_config,
3843                        tokenizer,
3844                    )
3845                )
3846                futures.append(future)
3847                await asyncio.sleep(sleep_per_set / len(batched_uids))
3848            await asyncio.gather(*futures, return_exceptions=True)
3849            logger.info(f"Finished processing batch {i}/{n_sets}.")
3850            logger.info(f"Setting weights for batch {i}/{n_sets}.")
3851            await self.report_metadata()
3852            self.set_weights()
3853        logger.info("Finished processing all batches.")
3854
3855    async def _forward_batch(
3856        self,
3857        tier: str,
3858        model_name: str,
3859        batched_uids: list[int],
3860        ground_truth_synapse: TextCompressProtocol,
3861        task_config,
3862        tokenizer=None,
3863    ):
3864        try:
3865            dendrite = bt.dendrite(self.wallet)
3866            synapse = ground_truth_synapse.miner_synapse
3867            logger.info(f"Querying miners {batched_uids}.")
3868            responses = await vutils.loop.query_miners(
3869                dendrite=dendrite,
3870                metagraph=self.metagraph,
3871                uids=batched_uids,
3872                synapse=synapse,
3873                timeout=constants.TIER_CONFIG[tier].timeout,
3874            )
3875
3876            if not responses:
3877                logger.warning(f"No responses from {batched_uids}.")
3878                return
3879            try:
3880                logger.info(f"Validating responses for {batched_uids}.")
3881                logger.info(responses[0].compressed_context)
3882                (
3883                    valid_responses,
3884                    valid_uids,
3885                    invalid_uids,
3886                    invalid_reasons,
3887                ) = await vutils.loop.validate_responses(
3888                    responses=responses,
3889                    uids=batched_uids,
3890                    tier_config=constants.TIER_CONFIG[tier],
3891                    tokenizer=tokenizer,
3892                    tier=tier,
3893                    ground_truth_synapse=ground_truth_synapse,
3894                )
3895            except Exception as e:
3896                logger.error(f"Error validating responses: {e}")
3897                traceback.print_exc()
3898                return
3899            try:
3900                logger.info(
3901                    f"Processing and scoring responses for valid_uids: {valid_uids}"
3902                )
3903                start_time = time.time()
3904                logs, total_uids = await vutils.loop.process_and_score_responses(
3905                    miner_manager=self.miner_manager,
3906                    valid_responses=valid_responses,
3907                    valid_uids=valid_uids,
3908                    invalid_uids=invalid_uids,
3909                    ground_truth_synapse=ground_truth_synapse,
3910                    model_name=model_name,
3911                    task_config=task_config,
3912                    tier_config=constants.TIER_CONFIG[tier],
3913                    config=self.config,
3914                    invalid_reasons=invalid_reasons,
3915                    timeout=300,
3916                    tier=tier,
3917                )
3918                end_time = time.time()
3919                logger.info(
3920                    f"Time taken to process and score responses: {end_time - start_time:.2f} seconds"
3921                )
3922            except Exception as e:
3923                logger.error(f"Error processing and scoring responses: {e}")
3924                return
3925
3926            batch_information = (
3927                f"Batch Metrics - {tier} - {model_name} - {task_config.task}"
3928            )
3929            batch_report_df = vutils.loop.logging.log_as_dataframe(logs)
3930            logger.info(
3931                f"Logging dataframe {batch_information}:\n{batch_report_df.to_markdown()}"
3932            )
3933
3934            if self.config.validator.use_wandb:
3935                vutils.loop.logging.log_wandb(logs, total_uids, tier=tier)
3936
3937            await self.miner_manager.report(
3938                {
3939                    "comparision": batch_report_df.to_dict(),
3940                    "challenge": ground_truth_synapse.validator_payload,
3941                    "task": task_config.task,
3942                    "tier": tier,
3943                },
3944                "api/report-batch",
3945            )
3946
3947        except Exception as e:
3948            traceback.print_exc()
3949            logger.error(f"Error: {e}")
3950
3951    def set_weights(self):
3952        try:
3953            self.current_block = self.subtensor.get_current_block()
3954        except OSError as e:
3955            logger.warning(f"Subtensor not available, reconnecting: {e}")
3956            self.subtensor = bt.subtensor(config=self.config)
3957            logger.info("Reconnected to subtensor.")
3958            self.current_block = self.subtensor.get_current_block()
3959        except Exception as e:
3960            logger.error(f"Error getting current block: {e}")
3961            traceback.print_exc()
3962            return
3963        self.last_update = self.metagraph.last_update[self.uid]
3964        weights = self.miner_manager.get_normalized_ratings(
3965            top_percentage=constants.TOP_PERCENTAGE_FOR_ALLOCATING_WEIGHTS
3966        )
3967        (
3968            processed_weight_uids,
3969            processed_weights,
3970        ) = bt.utils.weight_utils.process_weights_for_netuid(
3971            uids=self.metagraph.uids,
3972            weights=weights,
3973            netuid=self.config.netuid,
3974            subtensor=self.subtensor,
3975            metagraph=self.metagraph,
3976        )
3977        (
3978            uint_uids,
3979            uint_weights,
3980        ) = bt.utils.weight_utils.convert_weights_and_uids_for_emit(
3981            uids=processed_weight_uids, weights=processed_weights
3982        )
3983        if self.current_block > self.last_update + constants.SUBNET_TEMPO:
3984            weight_info = list(zip(uint_uids, uint_weights))
3985            weight_info_df = pd.DataFrame(weight_info, columns=["uid", "weight"])
3986            logger.info(f"Weight info:\n{weight_info_df.to_markdown()}")
3987            logger.info("Actually trying to set weights.")
3988            try:
3989                future = self.set_weights_executor.submit(
3990                    self.subtensor.set_weights,
3991                    netuid=self.config.netuid,
3992                    wallet=self.wallet,
3993                    uids=uint_uids,
3994                    weights=uint_weights,
3995                )
3996                success, msg = future.result(timeout=120)
3997                if not success:
3998                    logger.error(f"Failed to set weights: {msg}")
3999            except Exception as e:
4000                logger.error(f"Failed to set weights: {e}")
4001                traceback.print_exc()
4002
4003            logger.info(f"Set weights result: {success}")
4004        else:
4005            logger.info(
4006                f"Not setting weights because current block {self.current_block} is not greater than last update {self.last_update} + tempo {constants.SUBNET_TEMPO}"
4007            )
4008
4009
4010if __name__ == "__main__":
4011    with Validator() as validator:
4012        while True:
4013            logger.info("validator_status", object=validator)
4014            time.sleep(60 * 10)
4015
4016
4017
4018---
4019File: /scripts/enable_hf_transfer.sh
4020---
4021
4022uv add hf_transfer --prerelease=allow
4023export HF_HUB_ENABLE_HF_TRANSFER=1
4024
4025
4026---
4027File: /scripts/install_redis.sh
4028---
4029
4030#!/bin/bash
4031
4032# Function to check if the user has root privileges
4033check_root() {
4034    if [ "$(id -u)" -ne 0 ]; then
4035        echo "You are not running as root. Commands requiring root will use 'sudo'."
4036        SUDO="sudo"
4037    else
4038        echo "You are running as root. 'sudo' is not required."
4039        SUDO=""
4040    fi
4041}
4042
4043# Run the root check
4044check_root
4045
4046# Update the package list
4047$SUDO apt update
4048
4049# Install Redis
4050$SUDO apt install -y redis
4051
4052
4053
4054# Verify installation
4055if redis-cli --version; then
4056    echo "Redis installed successfully."
4057else
4058    echo "Redis installation failed."
4059    exit 1
4060fi
4061
4062# Attempt to start Redis with systemctl
4063echo "Attempting to start Redis using systemctl..."
4064if $SUDO systemctl start redis 2>/dev/null; then
4065    echo "Redis started successfully using systemctl."
4066else
4067    echo "systemctl not available or failed. Starting Redis manually..."
4068    if redis-server --daemonize yes; then
4069        echo "Redis started manually in the background."
4070    else
4071        echo "Failed to start Redis. Check your setup."
4072        exit 1
4073    fi
4074fi
4075
4076# Enable Redis to start on boot (optional, for non-WSL environments)
4077if $SUDO systemctl enable redis 2>/dev/null; then
4078    echo "Redis enabled to start on boot using systemctl."
4079else
4080    echo "systemctl not available. Skipping boot configuration."
4081fi
4082
4083# Test Redis
4084if redis-cli ping | grep -q "PONG"; then
4085    echo "Redis is working correctly!"
4086else
4087    echo "Redis test failed. Check the service status or logs."
4088fi
4089
4090
4091
4092---
4093File: /services/miner_backend/soft_token/soft_token_condenser_modeling.py
4094---
4095
4096import torch
4097import torch.nn as nn
4098import huggingface_hub
4099from transformers import AutoModelForCausalLM, AutoTokenizer
4100
4101
4102class Condenser(nn.Module):
4103    """
4104    A neural module for condensing large text contexts into smaller dense representations
4105    """
4106
4107    def __init__(
4108        self,
4109        num_condense_tokens: int,
4110        hidden_size: int,
4111        n_last_hidden_states: int,
4112        condense_model: AutoModelForCausalLM,
4113        condense_tokenizer: AutoTokenizer,
4114    ):
4115        super().__init__()
4116        self.dtype = torch.bfloat16
4117        self.num_condense_tokens = num_condense_tokens
4118        self.hidden_size = hidden_size
4119        self.n_last_hidden_states = n_last_hidden_states
4120        self.condense_model = condense_model
4121        self.condense_tokenizer = condense_tokenizer
4122
4123        self.norm = nn.LayerNorm(self.hidden_size * n_last_hidden_states).to(
4124            dtype=self.dtype, device="cuda"
4125        )
4126        self.linear = nn.Linear(self.hidden_size * n_last_hidden_states, 4096).to(
4127            dtype=self.dtype, device="cuda"
4128        )
4129        self.pre_condensed_tokens = nn.Parameter(
4130            torch.randn(
4131                1,
4132                num_condense_tokens,
4133                self.hidden_size,
4134                dtype=self.dtype,
4135                device="cuda",
4136            )
4137        )
4138
4139    @classmethod
4140    def from_pretrained(cls, repo_id, local_dir="./", dtype=torch.bfloat16):
4141        # Download and load checkpoint
4142        file_path = huggingface_hub.hf_hub_download(
4143            repo_id=repo_id,
4144            filename="checkpoints/modules.pt",
4145            local_dir=local_dir,
4146        )
4147        state_dict = torch.load(file_path)
4148
4149        # Extract model configuration
4150        num_condense_tokens = state_dict["modules"]["pre_condensed_tokens"].shape[1]
4151        hidden_size = state_dict["modules"]["pre_condensed_tokens"].shape[2]
4152        linear_input_dim = state_dict["modules"]["linear_state_dict"]["weight"].shape[1]
4153        n_last_hidden_states = linear_input_dim // hidden_size
4154
4155        # Load model and tokenizer
4156        condense_model = AutoModelForCausalLM.from_pretrained(
4157            repo_id, torch_dtype=dtype
4158        ).to("cuda")
4159        condense_tokenizer = AutoTokenizer.from_pretrained("unsloth/Llama-3.2-1B")
4160        condense_tokenizer.pad_token = condense_tokenizer.eos_token
4161
4162        # Initialize and load state_dict
4163        model = cls(
4164            num_condense_tokens,
4165            hidden_size,
4166            n_last_hidden_states,
4167            condense_model,
4168            condense_tokenizer,
4169        )
4170        model.load_state_dict(state_dict["modules"])
4171        return model
4172
4173    def load_state_dict(self, state_dict: dict):
4174        self.pre_condensed_tokens.data = state_dict["pre_condensed_tokens"].to(
4175            dtype=self.dtype, device="cuda"
4176        )
4177        self.linear.load_state_dict(
4178            {
4179                k: v.to(dtype=self.dtype, device="cuda")
4180                for k, v in state_dict["linear_state_dict"].items()
4181            }
4182        )
4183        self.norm.load_state_dict(
4184            {
4185                k: v.to(dtype=self.dtype, device="cuda")
4186                for k, v in state_dict["norm_state_dict"].items()
4187            }
4188        )
4189
4190    @torch.no_grad()
4191    def compress(self, context: str) -> torch.Tensor:
4192        # Tokenize and process context
4193        output = self.condense_tokenizer(
4194            context,
4195            return_tensors="pt",
4196            add_special_tokens=False,
4197            padding="max_length",
4198            max_length=4096,
4199            truncation=True,
4200            return_attention_mask=True,
4201        )
4202        context_ids = output.input_ids.to(device="cuda")
4203        attention_mask = output.attention_mask.to(device="cuda")
4204
4205        # Processing embedding and condensation
4206        context_embeds = self.condense_model.get_input_embeddings()(context_ids)
4207        inputs_embeds_condense = torch.cat(
4208            [context_embeds, self.pre_condensed_tokens], dim=1
4209        )
4210        expanded_attention_mask = torch.cat(
4211            [
4212                attention_mask,
4213                torch.ones(
4214                    attention_mask.shape[0],
4215                    self.num_condense_tokens,
4216                    dtype=attention_mask.dtype,
4217                    device=attention_mask.device,
4218                ),
4219            ],
4220            dim=1,
4221        )
4222
4223        # Generate condensed tokens
4224        output = self.condense_model(
4225            inputs_embeds=inputs_embeds_condense,
4226            output_hidden_states=True,
4227            attention_mask=expanded_attention_mask,
4228        )
4229        hidden_states = torch.cat(
4230            output.hidden_states[-self.n_last_hidden_states :], dim=-1
4231        )[:, -self.num_condense_tokens :, :]
4232
4233        return self.linear(self.norm(hidden_states))
4234
4235
4236
4237---
4238File: /services/miner_backend/app.py
4239---
4240
4241from flask import Flask, request, jsonify
4242import time
4243from transformers import AutoModelForCausalLM, AutoTokenizer, DynamicCache
4244from kvpress import KnormPress
4245import torch
4246from .soft_token.soft_token_condenser_modeling import Condenser
4247import os
4248import minio
4249import structlog
4250from .utils import upload_to_minio
4251import argparse
4252
4253logger = structlog.get_logger()
4254logger.info("This will show in Uvicorn logs")
4255
4256
4257class CompressionService:
4258    def __init__(self, algorithm: str):
4259        self.dtype = torch.bfloat16
4260        self.algorithm = algorithm
4261        self._init_minio_client()
4262        self._init_model()
4263
4264    def _init_minio_client(self):
4265        """Initialize MinIO client and validate config"""
4266        self.bucket_name = os.getenv("MINIO_BUCKET", "condense_miner")
4267        self.endpoint_url = os.getenv("MINIO_SERVER")
4268
4269        self._validate_minio_config()
4270
4271        self.minio_client = minio.Minio(
4272            self.endpoint_url.replace("http://", "").replace("https://", ""),
4273            access_key=os.getenv("MINIO_ACCESS_KEY"),
4274            secret_key=os.getenv("MINIO_SECRET_KEY"),
4275            secure=False,
4276        )
4277
4278    def _init_model(self):
4279        """Initialize model based on selected algorithm"""
4280        self.device = "cuda"
4281
4282        if self.algorithm == "kvpress":
4283            self.ckpt = "Condense-AI/Mistral-7B-Instruct-v0.2"
4284            self.tokenizer = AutoTokenizer.from_pretrained(self.ckpt)
4285            self.model = AutoModelForCausalLM.from_pretrained(
4286                self.ckpt, torch_dtype=self.dtype
4287            ).to(self.device)
4288            self.press = KnormPress(compression_ratio=0.75)
4289
4290        elif self.algorithm == "soft_token":
4291            self.ckpt = "Condense-AI/Mistral-7B-Instruct-v0.2"
4292            self.repo_id = "Condense-AI/Soft-Token-Condenser-Llama-3.2-1B"
4293            self.condenser = Condenser.from_pretrained(self.repo_id, dtype=self.dtype)
4294            self.condenser.eval()
4295            self.tokenizer = AutoTokenizer.from_pretrained(self.ckpt)
4296            self.model = AutoModelForCausalLM.from_pretrained(
4297                self.ckpt, torch_dtype=self.dtype
4298            ).to(self.device)
4299            self.press = KnormPress(compression_ratio=0.75)
4300
4301        elif self.algorithm == "activation_beacon":
4302            self.ckpt = "namespace-Pt/ultragist-mistral-7b-inst"
4303            self.tokenizer = AutoTokenizer.from_pretrained(
4304                self.ckpt,
4305                trust_remote_code=True,
4306            )
4307            self.model = AutoModelForCausalLM.from_pretrained(
4308                self.ckpt,
4309                trust_remote_code=True,
4310                torch_dtype=self.dtype,
4311                attn_implementation="sdpa",
4312                ultragist_ratio=[4],
4313            ).to(self.device)
4314
4315    @torch.no_grad()
4316    def compress_context(self, context: str) -> str:
4317        """Compress context using selected algorithm"""
4318        if self.algorithm == "kvpress":
4319            return self._compress_kvpress(context)
4320        elif self.algorithm == "soft_token":
4321            return self._compress_soft_token(context)
4322        elif self.algorithm == "activation_beacon":
4323            return self._compress_activation_beacon(context)
4324
4325    def _compress_kvpress(self, context: str) -> str:
4326        input_ids = self.tokenizer(
4327            context, return_tensors="pt", add_special_tokens=False
4328        ).input_ids.to(self.device)
4329
4330        with torch.no_grad(), self.press(self.model):
4331            past_key_values = self.model(
4332                input_ids, num_logits_to_keep=1
4333            ).past_key_values
4334
4335        return self._save_and_return_url(past_key_values)
4336
4337    def _compress_soft_token(self, context: str) -> str:
4338        compressed_tokens = self.condenser.compress(context)
4339
4340        with torch.no_grad(), self.press(self.model):
4341            past_key_values = self.model(
4342                inputs_embeds=compressed_tokens
4343            ).past_key_values
4344
4345        return self._save_and_return_url(past_key_values)
4346
4347    def _compress_activation_beacon(self, context: str) -> str:
4348        input_ids = self.tokenizer(context, return_tensors="pt").input_ids.to(
4349            self.device
4350        )
4351
4352        self.model.memory.reset()
4353        self.model(input_ids=input_ids)
4354        past_key_values = self.model.memory.get_memory()
4355
4356        # Log metrics specific to activation beacon
4357        ultragist_size, raw_size, sink_size = self.model.memory.get_memory_size()
4358        logger.info(
4359            "model_metrics",
4360            ultragist_size=ultragist_size,
4361            raw_size=raw_size,
4362            sink_size=sink_size,
4363        )
4364
4365        return self._save_and_return_url(past_key_values)
4366
4367    def _save_and_return_url(self, past_key_values):
4368        """Process output and save to MinIO"""
4369        DynamicCache(past_key_values)
4370
4371        numpy_past_key_values = tuple(
4372            tuple(tensor.to(dtype=torch.float32).cpu().numpy() for tensor in tensors)
4373            for tensors in past_key_values
4374        )
4375
4376        filename = f"{int(time.time_ns())}.npy"
4377        upload_to_minio(
4378            self.minio_client, self.bucket_name, filename, numpy_past_key_values
4379        )
4380
4381        return f"{self.endpoint_url}/{self.bucket_name}/{filename}"
4382
4383    def _validate_minio_config(self):
4384        """Validate MinIO configuration"""
4385        if not self.endpoint_url:
4386            raise ValueError("MINIO_SERVER is not set")
4387        if not self.bucket_name:
4388            raise ValueError("MINIO_BUCKET is not set")
4389        if not os.getenv("MINIO_ACCESS_KEY"):
4390            raise ValueError("MINIO_ACCESS_KEY is not set")
4391        if not os.getenv("MINIO_SECRET_KEY"):
4392            raise ValueError("MINIO_SECRET_KEY is not set")
4393        if not (
4394            self.endpoint_url.startswith("http://")
4395            or self.endpoint_url.startswith("https://")
4396        ):
4397            raise ValueError("MINIO_SERVER must start with http:// or https://")
4398
4399
4400def create_app(algorithm):
4401    app = Flask(__name__)
4402    service = CompressionService(algorithm)
4403
4404    @app.route("/condense", methods=["POST"])
4405    def compress_endpoint():
4406        """Endpoint for compressing context"""
4407        data = request.get_json()
4408        context = data.get("context")
4409        target_model = data.get("target_model")
4410
4411        if not context:
4412            return jsonify({"error": "Missing 'context' in request"}), 400
4413
4414        try:
4415            compressed_kv_url = service.compress_context(context)
4416            return jsonify(
4417                {"target_model": target_model, "compressed_kv_url": compressed_kv_url}
4418            )
4419        except Exception as e:
4420            logger.exception("compression_failed", error=str(e))
4421            return (
4422                jsonify({"error": "Failed to process request", "details": str(e)}),
4423                500,
4424            )
4425
4426    return app
4427
4428
4429# This allows direct running of the file
4430if __name__ == "__main__":
4431    parser = argparse.ArgumentParser()
4432    parser.add_argument(
4433        "--algorithm",
4434        default="kvpress",
4435        choices=["kvpress", "soft_token", "activation_beacon"],
4436    )
4437    args = parser.parse_args()
4438    app = create_app(args.algorithm)
4439    app.run()
4440
4441
4442
4443---
4444File: /services/miner_backend/object_cleaner.py
4445---
4446
4447import time
4448import os
4449from minio import Minio
4450import structlog
4451from datetime import datetime, timezone
4452
4453logger = structlog.get_logger()
4454
4455
4456class MinioCleanup:
4457    def __init__(self):
4458        self.bucket_name = os.getenv("MINIO_BUCKET", "condense_miner")
4459        self.minio_client = Minio(
4460            f"localhost:{os.getenv('MINIO_PORT')}",
4461            access_key=os.getenv("MINIO_ACCESS_KEY"),
4462            secret_key=os.getenv("MINIO_SECRET_KEY"),
4463            secure=False,
4464        )
4465
4466    def cleanup_old_objects(self, max_age_hours=1):
4467        """Delete objects older than max_age_hours"""
4468        try:
4469            # Get current time in UTC
4470            now = datetime.now(timezone.utc)
4471
4472            # List all objects in the bucket
4473            objects = list(self.minio_client.list_objects(self.bucket_name))
4474            total_files = len(objects)
4475            expired_files = 0
4476
4477            for obj in objects:
4478                # Calculate age in hours
4479                age = (now - obj.last_modified).total_seconds() / 3600
4480
4481                if age > max_age_hours:
4482                    self.minio_client.remove_object(self.bucket_name, obj.object_name)
4483                    expired_files += 1
4484                    logger.info(
4485                        "deleted_object",
4486                        object_name=obj.object_name,
4487                        age_hours=round(age, 2),
4488                    )
4489
4490            # Log summary statistics
4491            logger.info(
4492                "cleanup_summary",
4493                total_files=total_files,
4494                expired_files=expired_files,
4495                remaining_files=total_files - expired_files,
4496                bucket_name=self.bucket_name,
4497            )
4498
4499        except Exception as e:
4500            logger.exception("cleanup_failed", error=str(e))
4501
4502
4503def main():
4504    cleanup = MinioCleanup()
4505
4506    # Run cleanup every 5 minutes
4507    while True:
4508        cleanup.cleanup_old_objects()
4509        time.sleep(300)  # Sleep for 5 minutes
4510
4511
4512if __name__ == "__main__":
4513    main()
4514
4515
4516
4517---
4518File: /services/miner_backend/universal_app.py
4519---
4520
4521from flask import Flask, request, jsonify
4522from llmlingua import PromptCompressor
4523import structlog
4524import argparse
4525
4526logger = structlog.get_logger()
4527logger.info("This will show in Universal Miner Backend logs")
4528
4529
4530class CompressionService:
4531    def __init__(self, algorithm: str):
4532        self.algorithm = algorithm
4533        self._init_compressor()
4534
4535    def _init_compressor(self):
4536        """Initialize model based on selected algorithm"""
4537        self.device = "cuda"
4538
4539        if self.algorithm == "llmlingua":
4540            self.compressor = PromptCompressor()
4541        elif self.algorithm == "llmlingua-2":
4542            self.compressor = PromptCompressor(
4543                model_name="microsoft/llmlingua-2-xlm-roberta-large-meetingbank",
4544                use_llmlingua2=True,
4545            )
4546
4547    def compress_context(self, context: str) -> str:
4548        """Compress context using selected algorithm"""
4549        if self.algorithm == "llmlingua":
4550            compressed_prompt = self.compressor.compress_prompt(
4551                context, instruction="", question="", target_token=200
4552            )
4553        elif self.algorithm == "llmlingua-2":
4554            compressed_prompt = self.compressor.compress_prompt(
4555                context, rate=0.7, force_tokens=["\n", "?"]
4556            )
4557        return compressed_prompt["compressed_prompt"]
4558
4559
4560def create_app(algorithm):
4561    app = Flask(__name__)
4562    service = CompressionService(algorithm)
4563
4564    @app.route("/condense", methods=["POST"])
4565    def compress_endpoint():
4566        logger.info("Join compression endpoint")
4567        """Endpoint for compressing context"""
4568        data = request.get_json()
4569        context = data.get("context")
4570        target_model = data.get("target_model")
4571
4572        if not context:
4573            return jsonify({"error": "Missing 'context' in request"}), 400
4574
4575        try:
4576            compressed_context = service.compress_context(context)
4577            return jsonify(
4578                {"target_model": target_model, "compressed_context": compressed_context}
4579            )
4580        except Exception as e:
4581            logger.exception("compression_failed", error=str(e))
4582            return (
4583                jsonify({"error": "Failed to process request", "details": str(e)}),
4584                500,
4585            )
4586
4587    return app
4588
4589
4590# This allows direct running of the file
4591if __name__ == "__main__":
4592    parser = argparse.ArgumentParser()
4593    parser.add_argument(
4594        "--algorithm",
4595        default="llmlingua-2",
4596        choices=["llmlingua", "llmlingua-2"],
4597    )
4598    args = parser.parse_args()
4599    app = create_app(args.algorithm)
4600    app.run()
4601
4602
4603
4604---
4605File: /services/miner_backend/utils.py
4606---
4607
4608import minio
4609import io
4610import numpy as np
4611
4612
4613def upload_to_minio(
4614    minio_client: minio.Minio,
4615    bucket_name: str,
4616    object_name: str,
4617    data: tuple[tuple[np.ndarray, ...], ...],
4618):
4619    buffer = io.BytesIO()
4620    np.save(buffer, data)
4621    length = buffer.tell()
4622    buffer.seek(0)
4623    result = minio_client.put_object(bucket_name, object_name, buffer, length)
4624    return result
4625
4626
4627
4628---
4629File: /services/validator_backend/organic/app.py
4630---
4631
4632import neural_condense_core.validator_utils as vutils
4633from neural_condense_core.base.config import add_common_config, add_validator_config
4634import argparse
4635import bittensor as bt
4636from neural_condense_core.logger import logger
4637import time
4638
4639
4640def setup_config():
4641    parser = argparse.ArgumentParser()
4642    parser = add_common_config(parser)
4643    parser = add_validator_config(parser)
4644    config = bt.config(parser)
4645    logger.info(f"Config: {config}")
4646    return config
4647
4648
4649def setup_bittensor_objects(config: bt.config):
4650    wallet = bt.wallet(config=config)
4651    subtensor = bt.subtensor(config=config)
4652    metagraph = subtensor.metagraph(config.netuid)
4653    return wallet, metagraph
4654
4655
4656def setup_miner_manager(config: bt.config, wallet, metagraph: bt.metagraph):
4657    neuron_uid = metagraph.hotkeys.index(wallet.hotkey.ss58_address)
4658    miner_manager = vutils.managing.MinerManager(
4659        uid=neuron_uid, wallet=wallet, metagraph=metagraph, config=None
4660    )
4661    return miner_manager
4662
4663
4664def setup_organic_gate(config: bt.config, miner_manager: vutils.managing.MinerManager):
4665    organic_gate = vutils.monetize.OrganicGate(
4666        miner_manager=miner_manager, config=config
4667    )
4668    return organic_gate
4669
4670
4671def main():
4672    config = setup_config()
4673    wallet, metagraph = setup_bittensor_objects(config)
4674    miner_manager = setup_miner_manager(config, wallet, metagraph)
4675    organic_gate = setup_organic_gate(config, miner_manager)
4676    organic_gate.start_server()
4677
4678
4679if __name__ == "__main__":
4680    main()
4681    while True:
4682        time.sleep(60)
4683        logger.info("Running...")
4684
4685
4686
4687---
4688File: /services/validator_backend/scoring/anti_exploitation/filter_existance.py
4689---
4690
4691from transformers import AutoTokenizer, AutoModelForCausalLM, DynamicCache
4692from typing import List
4693import random
4694from semantic_text_splitter import TextSplitter
4695from copy import deepcopy
4696from datasets import load_dataset
4697from typing import Tuple
4698from ..utils import generate_answer
4699import re
4700import structlog
4701
4702logger = structlog.get_logger("filter_existance")
4703
4704
4705class FilterExistanceChecker:
4706    def _check_text_exists(
4707        self,
4708        tokenizer: AutoTokenizer,
4709        model: AutoModelForCausalLM,
4710        kv_cache: DynamicCache,
4711        query_chunk: str,
4712        context_length: int,
4713    ) -> bool:
4714        _kv_cache = deepcopy(kv_cache)
4715        prompt = f"""
4716You are a precise and objective fact-checker. Your task is to determine whether the following quoted text appears in the provided context or is a direct paraphrase of it. 
4717
4718Instructions:
4719- Consider the context to include information that might have been rephrased but retains the original meaning.
4720- Return 'yes' if the quoted text appears or is a clear paraphrase of the context.
4721- Return 'no' if the quoted text does not appear or if it is not a valid paraphrase.
4722- Your response should contain exactly one word: either 'yes' or 'no'. No additional text or explanations are required.
4723
4724Quote:
4725```
4726{query_chunk}
4727```
4728[/INST] """
4729        logger.info(f"Filter Prompt: {prompt}")
4730        prompt_ids = tokenizer.encode(
4731            prompt,
4732            return_tensors="pt",
4733            add_special_tokens=False,
4734        )
4735        completion_text = generate_answer(
4736            model=model,
4737            tokenizer=tokenizer,
4738            question_ids=prompt_ids,
4739            cache=_kv_cache,
4740            context_length=context_length,
4741            max_new_tokens=8,
4742        )
4743        logger.info(f"Filter Completion: {completion_text}")
4744        # Split response into words and clean up
4745        words = re.findall(r"\b\w+\b", completion_text.lower())
4746        return "yes" in words, "no" in words or "not" in words
4747
4748    def filter_existance(
4749        self,
4750        tokenizer: AutoTokenizer,
4751        model: AutoModelForCausalLM,
4752        kv_cache: DynamicCache,
4753        positive_chunks: List[str],
4754        negative_chunks: List[str],
4755        context_length: int,
4756    ) -> float:
4757        positive_accuracies = []
4758        for positive_chunk in positive_chunks:
4759            exist_yes, exist_no = self._check_text_exists(
4760                tokenizer, model, kv_cache, positive_chunk, context_length
4761            )
4762            if not exist_yes or (exist_yes and exist_no):
4763                positive_accuracies.append(0)
4764            else:
4765                positive_accuracies.append(1)
4766        if not any(positive_accuracies):
4767            logger.info(f"All positive existance check failed: {positive_accuracies}")
4768            return 0
4769        negative_accuracies = []
4770        # Test on negative case (text not from conversation)
4771        for negative_chunk in negative_chunks:
4772            exist_yes, exists_no = self._check_text_exists(
4773                tokenizer, model, kv_cache, negative_chunk, context_length
4774            )
4775            if not exists_no or (exist_yes and exists_no):
4776                negative_accuracies.append(0)
4777            else:
4778                negative_accuracies.append(1)
4779        if not any(negative_accuracies):
4780            logger.info(f"All negative existance check failed: {negative_accuracies}")
4781            return 0
4782        accuracies = positive_accuracies + negative_accuracies
4783        logger.info(f"Filter Existance Accuracy: {accuracies}")
4784        return sum(accuracies) / len(accuracies)
4785
4786
4787
4788---
4789File: /services/validator_backend/scoring/metric_handlers/__init__.py
4790---
4791
4792from .accuracy import accuracy, preprocess_batch as accuracy_preprocess_batch
4793
4794metric_handlers = {
4795    "accuracy": {
4796        "handler": accuracy,
4797        "preprocess_batch": accuracy_preprocess_batch,
4798    },
4799}
4800
4801
4802
4803---
4804File: /services/validator_backend/scoring/metric_handlers/accuracy.py
4805---
4806
4807import torch
4808from transformers import (
4809    AutoTokenizer,
4810    DynamicCache,
4811    AutoModelForCausalLM,
4812    TextGenerationPipeline,
4813)
4814import structlog
4815from ..anti_exploitation.filter_existance import FilterExistanceChecker
4816from ..utils import generate_answer
4817from neural_condense_core.protocol import TaskData
4818from copy import deepcopy
4819
4820logger = structlog.get_logger("accuracy")
4821
4822DEFAULT_VALUE = 0
4823
4824
4825def accuracy(
4826    filter_existance_checker: FilterExistanceChecker,
4827    kv_cache: DynamicCache,
4828    task_data: TaskData,
4829    tokenizer: AutoTokenizer,
4830    model: AutoModelForCausalLM,
4831    judge_pipeline: TextGenerationPipeline,
4832    max_tokens: int = 256,
4833    **kwargs,
4834) -> float:
4835    context = task_data.formatted_context
4836    positive_chunks = task_data.positive_chunks
4837    negative_chunks = task_data.negative_chunks
4838    formatted_questions = task_data.formatted_questions
4839    questions = task_data.challenge_questions
4840    answers = task_data.challenge_answers
4841
4842    device = model.device
4843    context_ids = tokenizer.encode(
4844        context,
4845        return_tensors="pt",
4846        add_special_tokens=False,
4847    ).to(device=device, dtype=torch.long)
4848    context_length = context_ids.shape[1]
4849    num_seen_tokens = kv_cache._seen_tokens
4850    logger.debug("condense-length", length=num_seen_tokens)
4851    chunk_existance_accuracy: float = filter_existance_checker.filter_existance(
4852        tokenizer=tokenizer,
4853        model=model,
4854        kv_cache=kv_cache,
4855        positive_chunks=positive_chunks,
4856        negative_chunks=negative_chunks,
4857        context_length=context_length,
4858    )
4859    if chunk_existance_accuracy <= 0.1:
4860        logger.info(
4861            f"Too low chunk existance accuracy, skipping scoring: {chunk_existance_accuracy}"
4862        )
4863        return 0
4864
4865    questions_ids = [
4866        tokenizer(
4867            question,
4868            return_tensors="pt",
4869            add_special_tokens=False,
4870            max_length=max_tokens,
4871        ).input_ids.to(device=device, dtype=torch.long)
4872        for question in formatted_questions
4873    ]
4874    accuracies = []
4875    for question_ids, formatted_question, answer, question in zip(
4876        questions_ids, formatted_questions, answers, questions
4877    ):
4878        expected_completion_ids = tokenizer(
4879            answer,
4880            return_tensors="pt",
4881            add_special_tokens=False,
4882        ).input_ids.to(device=device, dtype=torch.long)
4883        n_expected_completion_tokens = expected_completion_ids.shape[1]
4884        max_new_tokens = max(int(n_expected_completion_tokens * 1.5), 8)
4885        _kv_cache = deepcopy(kv_cache)
4886        logger.debug("kv_length", length=_kv_cache._seen_tokens)
4887        completion = generate_answer(
4888            model=model,
4889            tokenizer=tokenizer,
4890            question_ids=question_ids,
4891            cache=_kv_cache,
4892            context_length=context_length,
4893            max_new_tokens=max_new_tokens,
4894        )
4895        ground_truth = answer.strip()
4896        logger.debug(f"Question: {formatted_question}")
4897        logger.debug(f"Completion: {completion}")
4898        logger.debug(f"Ground truth: {ground_truth}")
4899        accuracy = get_accuracy_llm(completion, ground_truth, question, judge_pipeline)
4900        accuracies.append(accuracy)
4901    logger.info(f"Accuracies: {accuracies}")
4902    return chunk_existance_accuracy * sum(accuracies) / len(accuracies)
4903
4904
4905def preprocess_batch(values: list[float]) -> list[float]:
4906    return [value if value is not None else DEFAULT_VALUE for value in values]
4907
4908
4909def get_accuracy_llm(
4910    completion: str,
4911    ground_truth: str,
4912    question: str,
4913    judge_pipeline: TextGenerationPipeline,
4914) -> float:
4915    messages = [
4916        {
4917            "role": "system",
4918            "content": "You are a helpful assistant that evaluates the correctness of a response to a question based on the ground truth.",
4919        },
4920        {
4921            "role": "user",
4922            "content": f"""Please evaluate the correctness of the following response to the question based on the ground truth.\n\n**Question**: {question}\n\n**Response**: {completion}\n\n**Ground truth**: {ground_truth}
4923You have to return 'yes' if the response is correct, 'no' if it is incorrect. The correct response should be have same meaning as the ground truth, don't need to be exactly the same. Please just return only 'yes' or 'no', don't need to explain.
4924""",
4925        },
4926    ]
4927    completion = judge_pipeline(
4928        messages,
4929        do_sample=False,
4930        max_new_tokens=16,
4931    )[0][
4932        "generated_text"
4933    ][-1]["content"]
4934    logger.debug(f"LLM Judge Response: {completion}")
4935    is_correct = "yes" in completion.lower()
4936    return 1 if is_correct else 0
4937
4938
4939
4940---
4941File: /services/validator_backend/scoring/app.py
4942---
4943
4944# TODO: Efficient switching between target models. Currently fixed to mistral-7b-instruct-v0.2.
4945from flask import Flask, request, jsonify
4946import torch
4947from transformers import (
4948    AutoTokenizer,
4949    AutoModelForCausalLM,
4950    DynamicCache,
4951    TextGenerationPipeline,
4952)
4953import random
4954import structlog
4955import gc
4956from neural_condense_core.protocol import BatchedScoringRequest
4957import traceback
4958from .metric_handlers import metric_handlers
4959from .anti_exploitation.filter_existance import FilterExistanceChecker
4960import time
4961import numpy as np
4962import io
4963
4964gc.enable()
4965
4966logger = structlog.get_logger("Validator-Backend")
4967
4968
4969def load_compressed_kv(filename: str) -> np.ndarray:
4970    with open(filename, "rb") as f:
4971        buffer = io.BytesIO(f.read())
4972        return np.load(buffer).astype(np.float32)
4973
4974
4975class ScoringService:
4976    def __init__(self):
4977        """
4978        Initializes the ScoringService with model and tokenizer storage, device configuration,
4979        and a lock for thread-safe operations. Runs a unit test to verify setup.
4980        """
4981        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
4982        self.dtype = torch.bfloat16
4983        self.model = AutoModelForCausalLM.from_pretrained(
4984            "Condense-AI/Mistral-7B-Instruct-v0.2"
4985        ).to(dtype=self.dtype, device=self.device)
4986        self.tokenizer = AutoTokenizer.from_pretrained(
4987            "Condense-AI/Mistral-7B-Instruct-v0.2"
4988        )
4989
4990        j_tokenizer = AutoTokenizer.from_pretrained(
4991            "upstage/solar-pro-preview-instruct"
4992        )
4993        j_model = AutoModelForCausalLM.from_pretrained(
4994            "upstage/solar-pro-preview-instruct",
4995            torch_dtype=self.dtype,
4996            trust_remote_code=True,
4997        )
4998        self.judge_pipeline = TextGenerationPipeline(
4999            model=j_model,
5000            tokenizer=j_tokenizer,
5001            device=self.device,
5002            torch_dtype=self.dtype,
5003        )
5004        self.filter_existance_checker = FilterExistanceChecker()
5005
5006    @torch.no_grad()
5007    def get_metrics(self, request: BatchedScoringRequest) -> dict[str, float]:
5008        logger.info("Received request")
5009        criteria = random.choice(request.criterias)
5010        values = []
5011        metric_handler = metric_handlers[criteria]["handler"]
5012        preprocess_batch = metric_handlers[criteria]["preprocess_batch"]
5013        logger.info(
5014            "positive_chunks",
5015            positive_chunks=request.task_data.positive_chunks,
5016        )
5017        logger.info(
5018            "negative_chunks",
5019            negative_chunks=request.task_data.negative_chunks,
5020        )
5021        for miner_response in request.miner_responses:
5022            try:
5023                kv_cache = DynamicCache.from_legacy_cache(
5024                    torch.from_numpy(load_compressed_kv(miner_response.filename)).to(
5025                        device=self.device, dtype=self.dtype
5026                    )
5027                )
5028                start_time = time.time()
5029                value = metric_handler(
5030                    filter_existance_checker=self.filter_existance_checker,
5031                    kv_cache=kv_cache,
5032                    model=self.model,
5033                    tokenizer=self.tokenizer,
5034                    task_data=request.task_data,
5035                    judge_pipeline=self.judge_pipeline,
5036                )
5037                end_time = time.time()
5038                logger.info(
5039                    "metric_handler_time",
5040                    handler_name=metric_handler.__name__,
5041                    time_taken=f"{end_time - start_time:.2f}s",
5042                )
5043            except Exception as e:
5044                logger.error(
5045                    "metric_handler_error",
5046                    error=str(e),
5047                    handler_name=metric_handler.__name__,
5048                    traceback=traceback.format_exc(),
5049                )
5050                value = None
5051            values.append(value)
5052            logger.info(
5053                "metric_value", handler_name=metric_handler.__name__, value=value
5054            )
5055        values = preprocess_batch(values)
5056        return {"metrics": {criteria: values}}
5057
5058
5059app = Flask(__name__)
5060scoring_service = ScoringService()
5061
5062
5063@app.route("/", methods=["GET"])
5064def is_alive():
5065    return jsonify({"message": "I'm alive!"})
5066
5067
5068@app.route("/get_metrics", methods=["POST"])
5069def get_metrics():
5070    request_data = BatchedScoringRequest(**request.get_json())
5071    return jsonify(scoring_service.get_metrics(request_data))
5072
5073
5074
5075---
5076File: /services/validator_backend/scoring/datatypes.py
5077---
5078
5079from typing import Any, List
5080from pydantic import BaseModel
5081import numpy as np
5082import io
5083import base64
5084
5085
5086class MinerResponse(BaseModel):
5087    filename: str
5088    compressed_kv: Any = None
5089
5090    def decode(self):
5091        self.compressed_kv = load_npy_from_filename(self.filename)
5092
5093
5094class GroundTruthRequest(BaseModel):
5095    context: str
5096    expected_completion: str
5097    activation_prompt: str
5098    model_name: str
5099    messages: List[dict]
5100    hidden_messages: List[dict]
5101    criterias: List[str]
5102    positive_chunks: List[str]
5103    negative_chunks: List[str]
5104
5105
5106class BatchedScoringRequest(BaseModel):
5107    miner_responses: List[MinerResponse]
5108    ground_truth_request: GroundTruthRequest
5109
5110
5111def load_npy_from_filename(filename: str) -> np.ndarray:
5112    with open(filename, "rb") as f:
5113        buffer = io.BytesIO(f.read())
5114        return np.load(buffer).astype(np.float32)
5115
5116
5117def base64_to_ndarray(base64_str: str) -> np.ndarray:
5118    try:
5119        """Convert a base64-encoded string back to a NumPy array."""
5120        buffer = io.BytesIO(base64.b64decode(base64_str))
5121        buffer.seek(0)
5122        array = np.load(buffer)
5123        array = array.astype(np.float32)
5124    except Exception as e:
5125        print(e)
5126        return None
5127    return array
5128
5129
5130def ndarray_to_base64(array: np.ndarray) -> str:
5131    try:
5132        """Convert a NumPy array to a base64-encoded string."""
5133        buffer = io.BytesIO()
5134        np.save(buffer, array)
5135        buffer.seek(0)
5136        base64_str = base64.b64encode(buffer.read()).decode("utf-8")
5137    except Exception as e:
5138        print(e)
5139        return ""
5140    return base64_str
5141
5142
5143
5144---
5145File: /services/validator_backend/scoring/utils.py
5146---
5147
5148from .datatypes import (
5149    MinerResponse,
5150    GroundTruthRequest,
5151    BatchedScoringRequest,
5152    ndarray_to_base64,
5153)
5154import torch
5155from transformers import AutoModelForCausalLM, AutoTokenizer, DynamicCache
5156
5157
5158def unit_test(self):
5159    """
5160    Runs a basic unit test to verify the setup and scoring functions for a sample request.
5161    """
5162    try:
5163        data = {
5164            "context": "<s> [INST] Provided the context: French senior civil servant arrested on suspicion of spying for North Korea\n\nNovember 27, 2018 by Joseph Fitsanakis\n\nA senior civil servant in the upper house of the French parliament has been arrested on suspicion of spying for North Korea, according to prosecutors. The news of the suspected spy\u2019s arrest was first reported on Monday by Quotidien, a daily politics and culture show on the Monaco-based television channel TMC. The show cited \u201ca judicial source in Paris\u201d and said that France\u2019s domestic security and counterintelligence agency, the General Directorate for Internal Security (DGSI), was in charge of the espionage case.\n\nThe senior administrator has been identified as Benoit Quennedey, a civil servant who liaises between the French Senate and the Department of Architecture and Heritage, which operates under France\u2019s Ministry of Culture. Quennedey was reportedly detained on Sunday morning and his office in the French Senate was raided by DGSI officers on the same day. Quotidien said that he was arrested on suspicion of \u201ccollecting and delivering to a foreign power information likely to subvert core national interests\u201d. The report did not provide specific information about the type of information that Quennedey is believed to have passed to North Korea. It did state, however, that a counterintelligence investigation into his activities began in March of this year.\n\nQuennedey is believed to be the president of the Franco-Korean Friendship Association, the French branch of a Spanish-based organization that lobbies in favor of international support for North Korea. Korea Friendship Association branches exist in over 30 countries and are believed to be officially sanctioned by Pyongyang. They operate as something akin to the pre-World War II Comintern (Communist International), a Moscow-sanctioned international pressure group that advocated in favor of Soviet-style communism around the world. French media reported on Monday that Quennedey traveled extensively to the Korean Peninsula in the past decade and has written a French-language book on North Korea. News reports said that the French President Emmanuel Macron had been made aware of Quennedey\u2019s arrest. The senior civil servant faces up to 30 years in prison if found guilty of espionage.\n\n\u25ba Author: Joseph Fitsanakis | Date: 27 November 2018 | Permalink\n\n",
5165            "activation_prompt": "Identify the person arrested on suspicion of spying for North Korea. [/INST]",
5166            "expected_completion": "Benoit Quennedey",
5167        }
5168        criterias = ["perplexity"]
5169
5170        context_ids = self.tokenizer(
5171            data["context"],
5172            return_tensors="pt",
5173            truncation=False,
5174            padding=False,
5175            add_special_tokens=False,
5176        )["input_ids"].to(self.device)
5177
5178        context_embeds = self.model.get_input_embeddings()(context_ids).squeeze(0)
5179        compressed_tokens = context_embeds.detach().cpu().numpy().tolist()
5180        compressed_tokens_b64 = ndarray_to_base64(compressed_tokens)
5181        miner_response = MinerResponse(compressed_tokens_b64=compressed_tokens_b64)
5182        ground_truth_request = GroundTruthRequest(
5183            context=data["context"],
5184            activation_prompt=data["activation_prompt"],
5185            expected_completion=data["expected_completion"],
5186            criterias=criterias,
5187        )
5188        request = BatchedScoringRequest(
5189            miner_responses=[miner_response],
5190            ground_truth_request=ground_truth_request,
5191        )
5192        self.get_metrics(request)
5193
5194        ground_truth_request.activation_prompt = (
5195            "Write exactly the same context as provided. [/INST]"
5196        )
5197        ground_truth_request.expected_completion = data["context"]
5198
5199        request = BatchedScoringRequest(
5200            miner_responses=[miner_response],
5201            ground_truth_request=ground_truth_request,
5202        )
5203        self.get_metrics(request)
5204    except Exception as e:
5205        print(f"Error in unit_test: {e}")
5206
5207
5208def generate_answer(
5209    model: AutoModelForCausalLM,
5210    tokenizer: AutoTokenizer,
5211    question_ids: torch.Tensor,
5212    cache: DynamicCache,
5213    context_length: int,
5214    max_new_tokens: int,
5215) -> str:
5216    """
5217    Generate an answer to a question using greedy decoding.
5218
5219    Parameters
5220    ----------
5221    question_ids : torch.Tensor
5222        The tokenized question.
5223    cache : Cache
5224        The compressed key-value cache.
5225    context_length : int
5226        The length of the context.
5227    max_new_tokens : int
5228        The maximum number of new tokens to generate.
5229
5230    Returns
5231    -------
5232    str
5233        The generated answer.
5234    """
5235
5236    cache_seq_lengths = [
5237        cache.get_seq_length(layer_idx) for layer_idx in range(len(cache))
5238    ]
5239    position_ids = torch.arange(
5240        context_length, context_length + question_ids.shape[1], device=model.device
5241    ).unsqueeze(0)
5242
5243    # if the user doesn't provide a question, skip forward pass
5244    outputs = model(
5245        input_ids=question_ids.to(model.device),
5246        past_key_values=cache,
5247        position_ids=position_ids,
5248        num_logits_to_keep=1,
5249    )
5250
5251    position_ids = position_ids[:, -1:] + 1
5252    generated_ids = [outputs.logits[0, -1].argmax()]
5253
5254    should_stop_token_ids = model.generation_config.eos_token_id
5255    if not isinstance(should_stop_token_ids, list):
5256        should_stop_token_ids = [should_stop_token_ids]
5257
5258    for i in range(max_new_tokens - 1):
5259        outputs = model(
5260            input_ids=generated_ids[-1].unsqueeze(0).unsqueeze(0),
5261            past_key_values=cache,
5262            position_ids=position_ids + i,
5263        )
5264        new_id = outputs.logits[0, -1].argmax()
5265        generated_ids.append(new_id)
5266        if new_id.item() in should_stop_token_ids:
5267            break
5268    answer = tokenizer.decode(torch.stack(generated_ids), skip_special_tokens=True)
5269
5270    key_attr, value_attr = "key_cache", "value_cache"
5271
5272    setattr(
5273        cache,
5274        key_attr,
5275        [key[:, :, :c] for key, c in zip(getattr(cache, key_attr), cache_seq_lengths)],
5276    )
5277    setattr(
5278        cache,
5279        value_attr,
5280        [
5281            value[:, :, :c]
5282            for value, c in zip(getattr(cache, value_attr), cache_seq_lengths)
5283        ],
5284    )
5285
5286    return answer
5287
5288
5289
5290---
5291File: /services/validator_backend/universal_scoring/app.py
5292---
5293
5294from fastapi import FastAPI
5295import numpy as np
5296from together import Together
5297from typing import List
5298import logging
5299from pydantic import BaseModel
5300from neural_condense_core import logger
5301
5302
5303class TaskData(BaseModel):
5304    formatted_context: str = ""
5305    original_context: str = ""
5306    challenge_questions: List[str] = []
5307    challenge_answers: List[str] = []
5308    formatted_questions: List[str] = []
5309    negative_chunks: List[str] = []
5310    positive_chunks: List[str] = []
5311
5312
5313class UtilData(BaseModel):
5314    compressed_kv_b64: str = ""
5315    compressed_length: int = 0
5316    download_time: float = 0.0
5317    bonus_compress_size: float = 0.0
5318    bonus_time: float = 0.0
5319    local_filename: str = ""
5320
5321
5322class MinerResponse(BaseModel):
5323    filename: str = ""
5324    compressed_context: str = ""
5325
5326
5327class BatchedScoringRequest(BaseModel):
5328    miner_responses: List[MinerResponse] = []
5329    task_data: TaskData = TaskData()
5330    target_model: str = ""
5331    criterias: List[str] = []
5332
5333
5334# logger = logging.getLogger("uvicorn")
5335logger.info("This will show in Universal Validator Backend logs")
5336
5337app = FastAPI()
5338
5339openai_client = Together()
5340
5341
5342@app.post("/get_metrics")
5343async def get_metrics(item: BatchedScoringRequest):
5344    logger.info(f"Received scoring request for model: {item.target_model}")
5345
5346    model = item.target_model
5347    if "Llama-3.1-8B-Instruct" in model:
5348        model = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo-128K"
5349
5350    compressed_contexts = [
5351        item.miner_responses[i].compressed_context
5352        for i in range(len(item.miner_responses))
5353    ]
5354    questions = item.task_data.challenge_questions
5355    ground_truths = item.task_data.challenge_answers
5356    negative_chunks = item.task_data.negative_chunks
5357    positive_chunks = item.task_data.positive_chunks
5358
5359    logger.info(f"Processing {len(compressed_contexts)} miner responses")
5360    logger.info(f"Number of questions: {len(questions)}")
5361    logger.info(f"Number of positive chunks: {len(positive_chunks)}")
5362    logger.info(f"Number of negative chunks: {len(negative_chunks)}")
5363    logger.info(f"Number of compressed contexts: {len(compressed_contexts)}")
5364    existence_scores = []
5365    for i, compressed_context in enumerate(compressed_contexts):
5366        logger.info(
5367            f"Getting existence score for response {i+1}/{len(compressed_contexts)}"
5368        )
5369        score = await get_chunk_existence_score(
5370            compressed_context, positive_chunks, negative_chunks, model
5371        )
5372        logger.info(f"Raw existence score: {score}, type: {type(score)}")
5373        if score is None or np.isnan(score):
5374            logger.error(f"Invalid existence score for response {i+1}")
5375            score = 0.0
5376        existence_scores.append(score)
5377        logger.info(f"Existence score for response {i+1}: {score}")
5378
5379    qa_scores = []
5380    for i, compressed_context in enumerate(compressed_contexts):
5381        logger.info(f"Getting QA score for response {i+1}/{len(compressed_contexts)}")
5382        score = await get_qa_score(compressed_context, questions, ground_truths, model)
5383        logger.info(f"Raw QA score: {score}, type: {type(score)}")
5384        if score is None or np.isnan(score):
5385            logger.error(f"Invalid QA score for response {i+1}")
5386            score = 0.0
5387        qa_scores.append(score)
5388        logger.info(f"QA score for response {i+1}: {score}")
5389
5390    final_scores = []
5391    for i, (existence_score, qa_score) in enumerate(zip(existence_scores, qa_scores)):
5392        logger.info(f"Calculating final score for response {i+1}")
5393        logger.info(f"Using existence_score={existence_score}, qa_score={qa_score}")
5394        if (
5395            existence_score is None
5396            or qa_score is None
5397            or np.isnan(existence_score)
5398            or np.isnan(qa_score)
5399        ):
5400            logger.error(f"Invalid scores for response {i+1}")
5401            final_score = 0.0
5402        else:
5403            final_score = (existence_score + qa_score) / 2
5404        final_scores.append(final_score)
5405        logger.info(f"Final score for response {i+1}: {final_score}")
5406
5407    avg_score = np.mean(final_scores) if final_scores else 0.0
5408    logger.info(f"Completed scoring. Raw scores: {final_scores}")
5409    logger.info(f"Completed scoring. Average final score: {avg_score:.4f}")
5410    return {"metrics": {"accuracy": final_scores}}
5411
5412
5413async def get_qa_score(
5414    compressed_context: str,
5415    questions: List[str],
5416    ground_truths: List[str],
5417    model: str,
5418):
5419    logger.info("Starting QA scoring")
5420    prompt = """
5421You are given a context and a question. Your task is to answer the question based on the context.
5422---CONTEXT---
5423{compressed_context}
5424---QUESTION---
5425{question}
5426---END---
5427Your response should be concise and to the point. Skip any greetings or other unrelevant information.
5428"""
5429    answers = []
5430
5431    for i, question in enumerate(questions):
5432        logger.info(f"Processing question {i+1}/{len(questions)}")
5433        messages = [
5434            {
5435                "role": "user",
5436                "content": prompt.format(
5437                    compressed_context=compressed_context, question=question
5438                ),
5439            },
5440        ]
5441        try:
5442            response = openai_client.chat.completions.create(
5443                model=model, messages=messages, temperature=0
5444            )
5445            text = response.choices[0].message.content
5446            answers.append(text)
5447        except Exception as e:
5448            logger.error(f"Error getting answer for question {i+1}: {str(e)}")
5449            raise
5450
5451    judge_prompt = """
5452You are given a set of question, answer, and ground truth. Your task is to determine whether the answer is correct.
5453---QUESTION---
5454{question}
5455---ANSWER---
5456{answer}
5457---GROUND TRUTH---
5458{ground_truth}
5459---END---
5460You only need to output one word: either 'yes' or 'no'. No additional text or explanations are required.
5461An answer is correct if it is mentioned the important points in the ground truth.
5462"""
5463    scores = []
5464    for i, (question, answer, ground_truth) in enumerate(
5465        zip(questions, answers, ground_truths)
5466    ):
5467        logger.info(f"Judging answer {i+1}/{len(questions)}")
5468        messages = [
5469            {
5470                "role": "user",
5471                "content": judge_prompt.format(
5472                    question=question, answer=answer, ground_truth=ground_truth
5473                ),
5474            },
5475        ]
5476        try:
5477            response = openai_client.chat.completions.create(
5478                model=model, messages=messages, temperature=0
5479            )
5480            text = response.choices[0].message.content
5481            text = text.strip().lower()
5482            words = text.split()
5483            result = "yes" in words and not ("no" in words or "not" in words)
5484            scores.append(result)
5485            logger.info(f"Answer {i+1} scored: {result}")
5486        except Exception as e:
5487            logger.error(f"Error judging answer {i+1}: {str(e)}")
5488            raise
5489
5490    if not scores:
5491        logger.warning("No valid scores generated in QA scoring")
5492        return 0.0
5493
5494    score = np.mean(scores)
5495    logger.info(f"Final QA score: {score:.4f}")
5496    return score
5497
5498
5499async def get_chunk_existence_score(
5500    compressed_context: str,
5501    positive_chunks: List[str],
5502    negative_chunks: List[str],
5503    model: str,
5504):
5505    logger.info("Starting chunk existence scoring")
5506    prompt = """
5507You are given a context and a chunk of text. Your task is to determine whether the chunk content is mentioned in the context.
5508---CONTEXT---
5509{compressed_context}
5510---CHUNK---
5511{chunk}
5512---END---
5513Your response should contain exactly one word: either 'yes' or 'no'. No additional text or explanations are required.
5514"""
5515    positive_scores = []
5516    negative_scores = []
5517
5518    for i, chunk in enumerate(positive_chunks):
5519        logger.info(f"Processing positive chunk {i+1}/{len(positive_chunks)}")
5520        messages = [
5521            {
5522                "role": "user",
5523                "content": prompt.format(
5524                    compressed_context=compressed_context, chunk=chunk
5525                ),
5526            },
5527        ]
5528        try:
5529            response = openai_client.chat.completions.create(
5530                model=model, messages=messages, temperature=0
5531            )
5532            text = response.choices[0].message.content
5533            text = text.strip().lower()
5534            words = text.split()
5535            result = "yes" in words and not ("no" in words or "not" in words)
5536            positive_scores.append(result)
5537            logger.info(f"Positive chunk {i+1} scored: {result}")
5538        except Exception as e:
5539            logger.error(f"Error processing positive chunk {i+1}: {str(e)}")
5540            raise
5541
5542    for i, chunk in enumerate(negative_chunks):
5543        logger.info(f"Processing negative chunk {i+1}/{len(negative_chunks)}")
5544        messages = [
5545            {
5546                "role": "user",
5547                "content": prompt.format(
5548                    compressed_context=compressed_context, chunk=chunk
5549                ),
5550            },
5551        ]
5552        try:
5553            response = openai_client.chat.completions.create(
5554                model=model, messages=messages, temperature=0
5555            )
5556            text = response.choices[0].message.content
5557            text = text.strip().lower()
5558            words = text.split()
5559            result = ("no" in words or "not" in words) and "yes" not in words
5560            negative_scores.append(result)
5561            logger.info(f"Negative chunk {i+1} scored: {result}")
5562        except Exception as e:
5563            logger.error(f"Error processing negative chunk {i+1}: {str(e)}")
5564            raise
5565
5566    if not positive_scores and not negative_scores:
5567        logger.warning("No valid scores generated in chunk existence scoring")
5568        return 0.0
5569
5570    score = np.mean(positive_scores + negative_scores)
5571    logger.info(f"Final existence score: {score:.4f}")
5572    return score
5573
5574
5575
5576---
5577File: /tests/test_miner_backend.py
5578---
5579
5580import requests
5581import argparse
5582from neural_condense_core.common.base64 import base64_to_ndarray
5583
5584# Default values for the base URL and port
5585DEFAULT_HOST = "localhost"
5586DEFAULT_PORT = 8080
5587DEFAULT_API_PATH = "/condense"
5588
5589
5590def get_args():
5591    """
5592    Function to parse command-line arguments for test configuration.
5593    """
5594    parser = argparse.ArgumentParser(description="Test API Endpoints.")
5595    parser.add_argument(
5596        "--host", type=str, default=DEFAULT_HOST, help="API host (default: localhost)"
5597    )
5598    parser.add_argument(
5599        "--port", type=int, default=DEFAULT_PORT, help="API port (default: 8080)"
5600    )
5601    parser.add_argument(
5602        "--api-path",
5603        type=str,
5604        default=DEFAULT_API_PATH,
5605        help="API path (default: /condense)",
5606    )
5607
5608    parser.add_argument(
5609        "--target-model",
5610        type=str,
5611        default="Condense-AI/Mistral-7B-Instruct-v0.1",
5612    )
5613
5614    args, _ = parser.parse_known_args()  # Avoid conflict with pytest's arguments
5615    return args
5616
5617
5618# Get arguments from the command line
5619args = get_args()
5620
5621# Construct the base URL using the provided arguments
5622BASE_URL = f"http://{args.host}:{args.port}{args.api_path}"
5623
5624
5625def get_api_url():
5626    """
5627    Function to provide the full API URL based on the host, port, and path.
5628    """
5629    return BASE_URL
5630
5631
5632def test_miner_api():
5633    """
5634    Test the prediction endpoint by sending a valid context and model request.
5635    """
5636    api_url = get_api_url()
5637
5638    payload = {
5639        "context": "This is a long test context that needs to be compressed.",
5640        "target_model": args.target_model,
5641    }
5642
5643    response = requests.post(api_url, json=payload)
5644
5645    if response.status_code != 200:
5646        raise Exception(f"Expected status code 200 but got {response.status_code}")
5647
5648    data = response.json()
5649
5650    if "compressed_tokens_base64" not in data:
5651        raise Exception("Response should contain compressed_tokens_base64.")
5652
5653    compressed_tokens = base64_to_ndarray(data["compressed_tokens_base64"])
5654
5655    seq_len, hidden_size = compressed_tokens.shape
5656
5657    print(f"Compressed tokens shape: {seq_len} x {hidden_size}")
5658
5659    print("API test passed successfully!")
5660
5661
5662
5663---
5664File: /tests/test_synthesizer.py
5665---
5666
5667from transformers import AutoTokenizer
5668from neural_condense_core.validator_utils.synthesizing import ChallengeGenerator
5669import json
5670import os
5671
5672os.makedirs("tmp", exist_ok=True)
5673
5674tokenizer = AutoTokenizer.from_pretrained("unsloth/Mistral-7B-Instruct-v0.2")
5675
5676tasks = [
5677    "question_answering",
5678    "causal_conversation",
5679    "reconstruct_conversation",
5680    "trivial_qa_conversation",
5681]
5682
5683challenge_generator = ChallengeGenerator()
5684
5685for task in tasks:
5686    challenge = challenge_generator.generate_challenge(tokenizer, task=task)
5687    json.dump(challenge.deserialize(), open(f"tmp/challenge_{task}.json", "w"))
5688
5689
5690
5691---
5692File: /tests/test_synthetic.py
5693---
5694
5695import time
5696import json
5697import numpy as np
5698from tqdm import tqdm
5699from transformers import AutoTokenizer
5700from neural_condense_core import validator_utils
5701import traceback
5702import asyncio
5703
5704
5705def benchmark_challenger(
5706    n_iterations: int = 10,
5707    max_characters: int = 10000,
5708    model_name: str = "Condense-AI/Mistral-7B-Instruct-v0.2",
5709):
5710    """
5711    Benchmark the Challenger model's response times and dataset creation for various tasks.
5712
5713    Args:
5714        n_iterations (int): Number of iterations per task to perform. Defaults to 5000.
5715        max_characters (int): Maximum character limit for context in each task. Defaults to 10000.
5716        model_name (str): The name of the model to use for tokenization. Defaults to "Condense-AI/Mistral-7B-Instruct-v0.2".
5717
5718    Returns:
5719        dict: Summary of benchmark results including average time per task and statistics on context length.
5720    """
5721    # Load tokenizer and initialize Challenger instance
5722    if model_name == "universal":
5723        tokenizer = AutoTokenizer.from_pretrained("gpt2")
5724    else:
5725        tokenizer = AutoTokenizer.from_pretrained(model_name)
5726    challenger = validator_utils.synthesizing.ChallengeGenerator(None)
5727
5728    # Define task types and initialize logs
5729    tasks = [
5730        "question_answering",
5731        # "causal_conversation",
5732        # "reconstruct_conversation",
5733        # "trivial_qa_conversation",
5734    ]
5735    time_logs = {task: 0 for task in tasks}
5736    error_count = 0
5737    dataset_items = []
5738    context_lengths = []
5739    token_counts = []
5740
5741    # Start progress bar for total iterations
5742    total_iterations = n_iterations * len(tasks)
5743    pbar = tqdm(total=total_iterations, desc="Benchmarking", unit="task")
5744
5745    for i in range(n_iterations):
5746        for task in tasks:
5747            try:
5748                start_time = time.time()
5749
5750                # Generate protocol using Challenger
5751                protocol = asyncio.run(
5752                    challenger.generate_challenge(
5753                        model_name=model_name,
5754                        task=task,
5755                        max_context_length_in_chars=max_characters,
5756                    )
5757                )
5758
5759                # Record details of the generated sample
5760                item = {
5761                    "task": task,
5762                    "id": i,
5763                    "data": protocol.validator_payload,
5764                    "model_id": model_name,
5765                    "max_characters": max_characters,
5766                }
5767
5768                # Track time taken for task
5769                time_logs[task] += time.time() - start_time
5770
5771                # Store context length and token count for analysis
5772                context_lengths.append(
5773                    len(item["data"]["task_data"]["formatted_context"])
5774                )
5775                tokens = tokenizer.encode(
5776                    item["data"]["task_data"]["formatted_context"]
5777                )
5778                token_counts.append(len(tokens))
5779
5780                # Add item to dataset items
5781                dataset_items.append(item)
5782
5783            except Exception as e:
5784                traceback.print_exc()
5785                print(f"Error during task '{task}' at iteration {i}: {e}")
5786                error_count += 1
5787                continue
5788
5789            # Update progress bar
5790            pbar.update(1)
5791
5792    # Close progress bar
5793    pbar.close()
5794
5795    # Calculate average processing time per task
5796    avg_time_logs = {
5797        task: total_time / n_iterations for task, total_time in time_logs.items()
5798    }
5799    error_rate = error_count / total_iterations
5800
5801    # Display benchmark summary
5802    print("\nBenchmark Summary:")
5803    print(f"Error count: {error_count}")
5804    print(f"Error rate: {error_rate:.2%}")
5805    print("Average processing times (seconds):", avg_time_logs)
5806
5807    # Analyze context lengths and tokens
5808    context_lengths = np.array(context_lengths)
5809    token_counts = np.array(token_counts)
5810    mean_length = context_lengths.mean()
5811    std_length = context_lengths.std()
5812    mean_tokens = token_counts.mean()
5813    std_tokens = token_counts.std()
5814
5815    print("\nContext length statistics:")
5816    print(f"Mean: {mean_length:.2f} characters")
5817    print(f"Standard Deviation: {std_length:.2f} characters")
5818    print("\nToken count statistics:")
5819    print(f"Mean: {mean_tokens:.2f} tokens")
5820    print(f"Standard Deviation: {std_tokens:.2f} tokens")
5821
5822    # Save dataset items to JSON file
5823    with open("synthetic_samples.json", "w") as file:
5824        json.dump(dataset_items, file)
5825
5826    # Return expanded summary of results
5827    return {
5828        "error_count": error_count,
5829        "error_rate": error_rate,
5830        "avg_time_per_task": avg_time_logs,
5831        "context_length_mean": mean_length,
5832        "context_length_std": std_length,
5833        "token_count_mean": mean_tokens,
5834        "token_count_std": std_tokens,
5835    }
5836
5837
5838# Run benchmark
5839benchmark_results = benchmark_challenger(
5840    n_iterations=100,
5841    max_characters=40000,
5842    model_name="universal",
5843)
5844
5845print("\nBenchmarking completed. Results:", benchmark_results)
5846
5847
5848
5849---
5850File: /tests/upload_synthetic_dataset.py
5851---
5852
5853from datasets import Dataset
5854import pandas as pd
5855import json
5856
5857
5858data = json.load(open("synthetic_samples.json"))
5859
5860dataset = Dataset.from_pandas(pd.DataFrame(data))
5861
5862dataset.push_to_hub("Condense-AI/synthetic-samples-v0.1")
5863
5864
5865
5866---
5867File: /auto_update.sh
5868---
5869
5870#!/bin/bash
5871
5872# Function to update repository
5873update_repo() {
5874    echo "Checking for updates..."
5875    
5876    # Fetch latest changes without merging
5877    git fetch origin
5878
5879    # Get current and remote hash
5880    LOCAL_HASH=$(git rev-parse HEAD)
5881    REMOTE_HASH=$(git rev-parse origin/main)
5882
5883    if [ "$LOCAL_HASH" != "$REMOTE_HASH" ]; then
5884        echo "Updates available. Updating repository..."
5885        
5886        # Stash any local changes
5887        git stash
5888        git checkout main
5889        # Pull latest changes
5890        git pull origin main
5891        git reset --hard origin/main
5892        
5893        # Reinstall dependencies
5894        uv sync --prerelease=allow
5895        
5896        pm2 restart condense_validator_backend
5897        pm2 restart condense_validator
5898
5899        # check if condense_validator is errored, if yes, restart
5900        if [ "$(pm2 jlist | jq '.[] | select(.name=="condense_validator") | .pm2_env.status' -r)" == "errored" ]; then
5901            echo "condense_validator is in error state, restarting..."
5902            pm2 restart condense_validator
5903        fi
5904        
5905        echo "Update completed successfully!"
5906    else
5907        echo "Repository is up to date!"
5908    fi
5909}
5910
5911# Run the update function
5912while true; do
5913    update_repo
5914    sleep 1800  # Sleep for 30 minutes (1800 seconds)
5915done
5916
5917
5918
5919---
5920File: /README.md
5921---
5922
5923<div align="center">
5924<picture>
5925    <source srcset="./assets/images/condense-main.png">
5926    <img src="./assets/images/condense-main.png" alt="Neural Condense Subnet">
5927
5928</picture>
5929</div>
5930
5931<div align="center">
5932
5933<pre>
5934
5935 βββββββ βββββββ ββββ   ββββββββββ ββββββββββββ   βββββββββββββββββββ     ββββββ βββ
5936ββββββββββββββββββββββ  ββββββββββββββββββββββββ  βββββββββββββββββββ    βββββββββββ
5937βββ     βββ   βββββββββ ββββββ  βββββββββ  ββββββ βββββββββββββββββ      βββββββββββ
5938βββ     βββ   ββββββββββββββββ  βββββββββ  ββββββββββββββββββββββββ      βββββββββββ
5939ββββββββββββββββββββ βββββββββββββββββββββββββ ββββββββββββββββββββββ    βββ  ββββββ
5940 βββββββ βββββββ βββ  ββββββββββββ βββββββββββ  βββββββββββββββββββββ    βββ  ββββββ
5941                                                                                                                                                                                      
5942</pre>
5943
5944</div>
5945
5946<div align="center">
5947
5948<h2>π‘ Explore Our Ecosystem π‘</h2>
5949
5950| Component                                | Link                                                              |
5951|------------------------------------------|-------------------------------------------------------------------|
5952| π **Condense-AI & API Document**                        | [Visit Condense-AI](https://condenses.ai)                         |
5953| π **API Library**                        | [Explore API Library](https://github.com/condenses/neural-condense) |
5954| π **Organic Forwarder For Validators**   | [Check Organic Forwarder](https://github.com/condenses/subnet-organic) |
5955| π **Miner Leaderboard & Statistics**     | [View Miner Dashboard](https://dashboard.condenses.ai) or [Wandb Logger](https://wandb.ai/toilaluan/Neural-Condense-Subnet)           |
5956
5957</div>
5958
5959---
5960
5961## Changelogs
5962- (25/11/2024) Version 0.0.2 Update: Added condensing activations layers, Switched to Distributed Storage from Restful API Transfer, Emissions now allocated only to the top 30% miners. 
5963
5964
5965## π Key Features:
5966
5967### β‘ Subnet as an Accelerate Adapter for LLM Inference
5968- π **Seamless Integration**: Effortlessly integrates with LLM inference engines, such as transformers π€, vllm.
5969- π§© **Token Compression**: The subnet API compresses long sequences of natural language tokens into soft tokens.
5970- ποΈ **Decentralized Network**: The subnet is a decentralized network that allows miners to contribute to the compression process.
5971- π **Tiered System**: The subnet has a tiered system, with a research tier for experimentation and an inference tier for production-scale use. Incentive distribution is splitted for each tier.
5972- π **Benchmarking and Validation**: The subnet owner defines synthetic metrics to benchmark minersβ performance, ensuring quality and efficiency.
5973
5974<div align="center">
5975<img src="https://github.com/user-attachments/assets/87060854-57bd-4b9b-9b06-b1edf87d182a" alt="condense" width="75%">
5976</div>
5977
5978### βοΈ Node Tiers
5979
5980
5981| **Tier**       | **Purpose**                           | **Context Size**         | **Incentive Percentage**     | **Supporting Models**               |
5982|----------------|---------------------------------------|---------------------------|---------------|--------------------------------------|
5983| `research`     | Optimize text-to-kv cache for a specific model | Up to 15000 characters                  | `60%`  | `mistralai/Mistral-7B-Instruct-v0.2` |
5984| `universal`     | Compress text representation for various models | Up to 15000 characters                  | `40%`  | `meta-llama/Llama-3.1-8B-Instruct` |
5985
5986
5987*Supporting models can be flexibly added based on tailored need.*
5988
5989<div align="center">
5990<img src="https://github.com/user-attachments/assets/b661ed8e-fc8a-45e3-ad78-6001dae93b21" alt="realese-circle" width="75%">
5991</div>
5992
5993
5994## π Documentation
5995- **Setup for miners**: [Miner Setup](./docs/miner.md)
5996- **Setup for validators**: [Validator Setup](./docs/validator.md)
5997- **Mechanism**: [Mechanism](./docs/mechanism.md)
5998