Bitsec (subnet 60)
Back to Reports

Vulnerability History

Date High Risk Low Risk
2024-12-16 3 0

Audit Report Details

5998
Lines of Code
7
Open
0
Resolved
🚨 High Risk Vulnerabilities
⚠️ Low Risk Vulnerabilities

Vulnerable Code:

1---
2File: /docs/mechanism.md
3---
4
5
6# DEPRECATED
7## πŸ“Š Score Calculation Overview
8
9This section provides an overview of how challenge items are created for training a token compressor for large language models (LLMs), the purpose of each task, and how scores are calculated. The goal is to compress long contexts into shorter tokens while ensuring the integrity and usefulness of the output.
10
111. [Challenge Item Creation](#challenge-item-creation)
12 - [Datasets Used](#datasets-used)
13 - [Challenge Generation Process](#challenge-generation-process)
142. [Purpose of Each Task](#purpose-of-each-task)
15 - [Question Answering Task](#question-answering-task)
16 - [Continual Conversation Task](#continual-conversation-task)
17 - [Reconstruction Task](#reconstruction-task)
183. [Score Calculation](#score-calculation)
19 - [Criteria Used](#criteria-used)
20 - [Scoring Methods](#scoring-methods)
21 - [Rating Groups](#rating-groups)
22 - [Tier System](#tier-system)
23
24Firstly, I want to credit [In-context Autoencoder](https://github.com/getao/icae) for the inspiration and the paper of token compressing.
25
26<div align="center">
27<img src="../assets/images/icae.png" alt="icae-method" width="100%">
28</div>
29
30### Challenge Item Creation for Synthetic
31
32**Preview:** https://huggingface.co/datasets/Condense-AI/synthetic-samples-v0.1
33
34<div align="center">
35<img src="https://github.com/user-attachments/assets/0734407e-f967-4b67-9a49-1da7c2c752f6" alt="preview-synth-dataset" width="75%">
36</div>
37
38This dataset is generated by challenge generating in the subnet.
39To reproduce it, please run:
40```bash
41git clone https://github.com/condenses/neural-condense-subnet
42pip install -e .
43cd neural_condense_subnet
44python tests/test_synthetic.py
45```
46
47Console Output:
48```
49Benchmark Summary:
50Error count: 0
51Error rate: 0.00%
52Average processing times (seconds): {'question_answering': 0.024009417057037352, 'reconstruction': 0.0036168951988220215, 'continual_conversation': 0.004922831296920776}
53
54Context length statistics:
55Mean: 10116.86 characters
56Standard Deviation: 86.42 characters
57```
58
59#### Datasets Used
60
61The challenge items are created using a combination of datasets to simulate real-world scenarios:
62
63| Dataset Type | Datasets Used |
64|--------------|---------------|
65| Context Seeds | FineWeb-Pro and other context datasets loaded via `load_context_datasets()` |
66| Conversation Seeds | Infinity Instruct, Open Math Instruct, and other instruction datasets loaded via `load_instruct_datasets()` |
67| Cached Examples | Condense-AI/subnet-synthetic-dataset-v0.2 |
68
69#### Challenge Generation Process
70
71The challenge generation system consists of three main components:
72
731. **ConvoGenerator**: Handles the generation of conversations and QA pairs using an LLM API
742. **Scheduler**: Manages queues of pre-generated challenges using Redis
753. **ChallengeGenerator**: Creates the final challenge items in the correct protocol format
76
77The process involves:
78
791. **Queue Management**:
80 - Pre-fills Redis queues with cached examples from subnet-synthetic-dataset-v0.2
81 - Continuously refreshes queues to maintain a steady supply of challenges
82 - Monitors queue sizes and generates new items when needed
83
842. **Challenge Types**:
85 - `question_answering`: QA pairs embedded within conversations
86 - `causal_conversation`: Natural flowing conversations
87 - `reconstruct_conversation`: Tests ability to reformat conversations
88 - `trivial_qa_conversation`: Fill-in-the-blank style questions
89
903. **Protocol Creation**:
91 - Inserts special tokens (`<START-ACTIVATE-TOKEN>`, `<END-ACTIVATE-TOKEN>`)
92 - Formats context, activation prompt, and expected completion
93 - Applies chat template using the provided tokenizer
94
95### Purpose of Each Task
96
97The system supports four main task types:
98
99#### Question Answering Task
100
101**Objective**: Test comprehension of context and ability to answer specific questions.
102
103- **Process**:
104 - Generates QA pairs from context using LLM
105 - Embeds QA within larger conversations
106 - Number of QA pairs controlled by `n_qa_per_context` (default: 4)
107
108#### Causal Conversation Task
109
110**Objective**: Test ability to maintain conversation flow and context.
111
112- **Process**:
113 - Uses conversation seeds from instruction datasets
114 - Generates additional turns using LLM
115 - Controls conversation length with `max_turns` parameter
116
117#### Reconstruction Task
118
119**Objective**: Test ability to reformat conversations while maintaining content.
120
121- **Process**:
122 - Takes existing conversation
123 - Requires reformatting into specific template:
124 ```
125 [Role]: [Message]
126 Example:
127 - User: Hello, how are you?
128 - Assistant: I am fine, thank you.
129 ```
130
131#### Trivial QA Task
132
133**Objective**: Test basic comprehension with fill-in-the-blank questions.
134
135- **Process**:
136 - Selects sentence from conversation
137 - Creates blank with surrounding context
138 - Expects exact completion of missing text
139### Score Calculation
140
141#### Criteria Used
142
143Scores are calculated based on perplexity and ELO ranking in batched competitions:
144
145| Criteria | Description |
146|----------|-------------|
147| Perplexity | Measures how well the model predicts the expected completion tokens, with lower values indicating better performance |
148| ELO Rating | A relative rating system that adjusts based on performance against other miners in the batch |
149
150#### Scoring Method
151
1521. **Perplexity Calculation**:
153 ```python
154 # Lower perplexity = better performance
155 perplexity = exp(-1/N * Ξ£ log P(token_i))
156 ```
157
1582. **Batch Competition**:
159 - Miners are grouped into batches of size `BATCH_SIZE` (default: 4)
160 - Each batch competes on the same challenge
161 - Performance is relative within the batch
162 - Groups are formed based on ELO ratings to ensure fair competition
163
1643. **ELO Rating Updates**:
165 ```python
166 # K-factor varies based on rating tier:
167 # - Beginner (0-800): K=24
168 # - Intermediate (800-1600): K=16
169 # - Advanced (1600-3000): K=4
170
171 expected_score = 1 / (1 + 10^((opponent_rating - player_rating)/400))
172 new_rating = current_rating + K * (actual_score - expected_score)
173 ```
174
1754. **Accelerator Rewards**:
176 Additional rewards are calculated based on:
177 ```python
178 compress_rate = 1 - len(compressed_tokens) / max_condensed_tokens
179 process_time = 1 - process_time / timeout
180 accelerator = max(0, (compress_rate + process_time) / 2)
181 ```
182
183#### Rating Groups
184
185Miners are divided into ELO rating groups that affect their K-factor:
186
187| Group | Rating Range | K-factor |
188|-------|-------------|----------|
189| Beginner | 0-800 | 24 |
190| Intermediate | 800-1600 | 16 |
191| Advanced | 1600-3000 | 4 |
192
193#### Tier System
194
195Each tier has specific configuration and incentive percentages:
196
197| Tier | Incentive % | Max Tokens | Context Length |
198|------|-------------|------------|----------------|
199| Research | 100% | 1024 | 10,000 chars |
200| Inference 0 | 0% | 1024 | 15,000 chars |
201| Inference 1 | 0% | 2048 | 20,000 chars |
202
203### Configuration
204
205The challenge system is configured through constants:
206
207```python
208SYNTHETIC_TASK_CONFIG = [
209 {
210 "task": "causal_conversation",
211 "criterias": ["perplexity"],
212 "rewarding_frequency": 1,
213 "weight": 1,
214 },
215 // ... other tasks ...
216]
217```
218
219Each tier has specific configuration for context length and token limits:
220
221```python
222TIER_CONFIG = {
223 "research": {
224 "max_context_length_in_chars": 10000,
225 "max_condensed_tokens": 1024,
226 "min_condensed_tokens": 128,
227 // ... other settings ...
228 },
229 // ... other tiers ...
230}
231```
232
233### Summary
234
235The token compression challenge is designed to rigorously test the token compressor's effectiveness in various scenarios by:
236
237- Using diverse datasets to simulate real-world inputs.
238- Creating tasks that challenge different aspects of comprehension and context retention.
239- Calculating scores based on LM metrics (perplexity,...) and ELO rating.
240
241By understanding how the challenge items are created, the purpose behind each task, and the scoring methodology, developers can better train and evaluate token compressors for LLMs.
242
243
244
245
246---
247File: /docs/miner.md
248---
249
250<div align="center">
251
252# ⚑ Miner Documentation
253
254</div>
255
256## Minimum Requirements for Baseline
257- GPU with at least 24GB of VRAM (RTX 4090, A6000, A100, H100, etc.) to run Baseline Model
258- CUDA, NVIDIA Driver installed
259- PM2 install (see [Guide to install PM2](./pm2.md))
260- Setup a cloud storage for uploading miner outputs. Here are some recommended options:
261 - `Huggingface Hub` (free but has some limitations)
262 - `AWS S3`
263 - `minio` (open-source version of AWS S3) (see [Guide to install MINIO](./minio.md))
264 - `Google Cloud Storage`
265 - `Azure Blob Storage`
266
267## What does a Miner do?
268
269A miner is a node that is responsible for condensing a long text into much shorter as condensed tokens & activations. These condensed tokens & activations are then used to feed to Large Language Models like Llama, Gemma, Mistral, etc.
270
271## How does a Miner work?
272
273We (subnet owner) provide some baselines for miners. But miners have to research their own algorithms to be more competitive. We also have a mission to push the latest SOTA algorithms to the miners as soon as possible.
274
275So basically, there are somethings that a miner has to do:
276
2771. Select a TIER: we have 2 tiers: research, universal. You can see the details in the miner's config file: `neural_condense_core/constants.py` or at the [README.md](../README.md) doc.
278
2792. Implement your own algorithm or pick one of our baseline algorithms. You can find the baseline algorithms in the `services/miner_backend/` folder.
280The schema of backend api is very simple: `Validator` sends you a dictionary with the `context: str` and `target_model: str`.
281- For `research` tier:
282Miner runs their own backend that results in KV Cache of the target LLM model. Then miner uploads the KV Cache to the `minio` storage and returns the `minio` path to the `Validator`.
283 - `past_key_values: Tuple[Tuple[torch.FloatTensor]]` is the format of the KV Cache. It would be loaded into the LLM using `torch.DynamicCache.from_legacy_cache(past_key_values)`.
284- For `universal` tier:
285Miner runs their own backend that results in compressed text representation and returns the compressed text to the `Validator` as an attribute of `SynapseResponse`.
286
287
2883. After having a competitive backend, you need to measure it to meet speed and load defined in the tier. **Our baselines are required to use GPU**.
289
2904. Register your slot and start mining.
291
292## Steps to setup a Miner
293
294### 1. Clone the repository
295```bash
296git clone https://github.com/condenses/neural-condense-subnet
297cd neural-condense-subnet
298```
299
300### 2. Install the dependencies
301```bash
302pip install uv
303uv sync --prerelease=allow
304. .venv/bin/activate
305. scripts/install_redis.sh
306```
307
308### 3. Config your wallet, backend, etc... Below just an example:
309
310**Parameters**
311- `--miner.tier` - The selected tier should be suitable with your backend.
312- `--netuid` - The network UID of the subnet.
313- `--subtensor.network` - The Subtensor network to connect to. `finney` for the mainnet. `test` for the testnet.
314- `--wallet.name` - The name of the wallet to use.
315- `--wallet.hotkey` - The hotkey of the wallet to use.
316- `--axon.port` - The port to be posted to metagraph.
317- `--miner.backend.host` - The host of the miner backend for condensing.
318- `--miner.backend.port` - The port of the miner backend for condensing.
319
320**Important**: `axon_port` must be opened in your firewall.
321
322**Define bash variable in your terminal**
323```bash
324miner_tier="research" # or "universal"
325miner_wallet="my_wallet"
326miner_hotkey="my_hotkey"
327miner_backend_host="localhost"
328miner_backend_port=8080
329miner_axon_port=12345
330miner_netuid=47
331miner_subtensor_network="finney"
332```
333
334### 4. Run the miner backend. <br>
335
336#### 4.a. Research tier: <br>
337You have to collect the `MINIO_ACCESS_KEY`, `MINIO_SECRET_KEY`, `MINIO_BUCKET`, `MINIO_SERVER` from the minio setup (see [minio.md](./minio.md)).
338
339There are three compression algorithms available:
340- `kvpress`: Basic KV-cache compression
341- `soft_token`: Soft token compression (requires additional model)
342- `activation_beacon`: Activation beacon compression
343
344```bash
345export MINIO_ACCESS_KEY="your_minio_access_key"
346export MINIO_SECRET_KEY="your_minio_secret_key"
347export MINIO_BUCKET="condense"
348export MINIO_SERVER="your_minio_server"
349
350# Choose one of the following commands based on your preferred algorithm:
351
352# For KVPress compression:
353pm2 start python --name condense_miner_backend \
354-- -m gunicorn "services.miner_backend.app:create_app('kvpress')" \
355--timeout 120 \
356--bind 0.0.0.0:$miner_backend_port
357
358# For Soft Token compression:
359pm2 start python --name condense_miner_backend \
360-- -m gunicorn "services.miner_backend.app:create_app('soft_token')" \
361--timeout 120 \
362--bind 0.0.0.0:$miner_backend_port
363
364# For Activation Beacon compression:
365pm2 start python --name condense_miner_backend \
366-- -m gunicorn "services.miner_backend.app:create_app('activation_beacon')" \
367--timeout 120 \
368--bind 0.0.0.0:$miner_backend_port
369```
370
371**Note**:
372- If using `soft_token` algorithm, you can train your own model using our prepared trainer at [Condense-Trainer](https://github.com/condenses/condense-trainer).
373- Each algorithm has different GPU memory requirements:
374 - `kvpress`: ~24GB VRAM
375 - `soft_token`: ~24GB VRAM + additional memory for condenser model
376 - `activation_beacon`: ~24GB VRAM
377
378You can also run the backend directly without PM2 for testing:
379```bash
380python -m gunicorn "services.miner_backend.app:create_app('kvpress')" --bind 0.0.0.0:8080
381```
382#### 4.b. Universal tier:
383You can check the default `llmlingua-2` model in the `services.miner_backend.universal_app` folder, and develop your own model to further improve the performance.
384```bash
385pm2 start python --name miner_universal_backend \
386 -- -m gunicorn "services.miner_backend.universal_app:create_app('llmlingua-2')" \
387 --timeout 120 \
388 --bind 0.0.0.0:8080
389```
390
391
392### 5. Run the mining script
393```bash
394pm2 start python --name condense_miner \
395-- -m neurons.miner \
396--netuid $miner_netuid \
397--subtensor.network $miner_subtensor_network \
398--wallet.name $miner_wallet \
399--wallet.hotkey $miner_hotkey \
400--miner.tier $miner_tier \
401--miner.backend_host $miner_backend_host \
402--miner.backend_port $miner_backend_port \
403--axon.port $miner_axon_port
404```
405
406
407---
408File: /docs/minio.md
409---
410
411### Setting Up MinIO Storage for Uploading Condensed Responses
412
413#### Overview
414
415The miner will generate a URL and share it with the validator. The validator will use this URL to download the condensed response.
416
417To facilitate this, a public storage solution is needed to ensure the validator can access the file. This guide demonstrates how to use MinIO for this purpose.
418
419For full instructions, refer to the [MinIO documentation](https://min.io/docs/minio/linux/operations/installation.html).
420
421Other storage options like AWS S3 or GCP Storage are also viable, as long as the validator can download the file from the provided URL.
422
423For a quick setup, you can use [Railway](https://railway.app/). Note, however, that it is a centralized service and may come with certain limitations. Use the provided template for deployment:
424
425[![Deploy MinIO](https://railway.com/button.svg)](https://railway.app/template/lRrxfF?referralCode=xpVB_C)
426
427**Important:** After setting up MinIO, you’ll need the following credentials:
428- `MINIO_ACCESS_KEY`
429- `MINIO_SECRET_KEY`
430- `MINIO_BUCKET`
431- `MINIO_SERVER`
432
433These details will be essential for configuring the miner.
434
435---
436
437### Step-by-Step Setup on a New Machine
438
4391. **Install the MinIO server**
440 Follow the setup instructions here: [Deploy MinIO on a Single Node](https://min.io/docs/minio/linux/operations/install-deploy-manage/deploy-minio-single-node-single-drive.html).
441 Ensure the MinIO server port is exposed publicly by including `--address 0.0.0.0:$open_port`.
442
443 - If running MinIO on the same machine, set `MINIO_SERVER` to `http://localhost:<open_port>`.
444 - If hosted on a remote machine, use `http://<remote_machine_ip>:<open_port>`.
445
4462. **Create a bucket**
447 Name the bucket `condense_miner` and configure it to be public.
448
4493. **Generate credentials**
450 Create the following credentials:
451 - `MINIO_ACCESS_KEY`
452 - `MINIO_SECRET_KEY`
453
4544. **Retrieve the server address**
455 Record the `MINIO_SERVER` address to be used for miner setup.
456
457
458---
459File: /docs/pm2.md
460---
461
462# Guide to install PM2
463
464## What is PM2?
465
466PM2 is a process manager for Node.js applications. It is a simple and easy-to-use tool that allows you to keep your Node.js applications alive and running, even when they crash. PM2 also provides a built-in load balancer, so you can easily scale your applications across all available CPUs.
467
468## Installation
469
4701. Install Node Version Manager (NVM) by running the following command:
471
472```bash
473curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.0/install.sh | bash
474```
475
4762. Restart your terminal and install Node.js by running the following command:
477
478```bash
479export NVM_DIR="$HOME/.nvm"
480nvm install 22
481```
482
4833. Install PM2 by running the following command:
484
485```bash
486npm i pm2 -g
487```
488
4894. Restart your terminal and verify the installation by running the following command:
490
491```bash
492pm2 --version
493```
494
495
496---
497File: /docs/validator.md
498---
499
500<div align="center">
501
502# ⚑ Validator Documentation
503
504</div>
505
506## Minimum Requirements
507- GPU with at least 80GB of VRAM (A100, H100, etc.) to run LLMs and Reward Model
508- 512GB of SSD storage
509- CUDA, NVIDIA Driver installed
510- Internet connection with at least 4Gbps
511- PM2 install (see [Guide to install PM2](./pm2.md))
512- Obtain TogetherAI API Key: https://api.together.ai/settings/api-keys
513
514## What does a Validator do?
515
516- Synthetic request & evaluate miner's performance by using prepared tasks: autoencoder, question-answering, conservation, etc.
517- Forward Organic API if you want to sell your bandwidth to the end-users.
518
519## Steps to setup a Validator
520
5211. Clone the repository
522```bash
523git clone https://github.com/condenses/neural-condense-subnet
524cd neural-condense-subnet
525```
526
5272. Install the dependencies
528```bash
529pip install uv
530uv sync --prerelease=allow
531. .venv/bin/activate
532. scripts/install_redis.sh
533```
534To test if Redis is working correctly, run `redis-cli ping` and it should return `PONG`.
535
536**Optional**
537- Login to Weights & Biases to use the logging feature
538```bash
539wandb login
540```
541
5423. Config your wallet, backend host, and port. Below just an example:
543
544**Parameters**
545- `--netuid` - The network UID of the subnet.
546- `--subtensor.network` - The Subtensor network to connect to. `finney` for the mainnet. `test` for the testnet.
547- `--wallet.name` - The name of the wallet to use.
548- `--wallet.hotkey` - The hotkey of the wallet to use.
549- `--axon.port` - The port to be posted to metagraph.
550- `--validator.score_backend.host` - The host of the validator backend for scoring.
551- `--validator.score_backend.port` - The port of the validator backend for scoring.
552- `--validator.gate_port` - The port to open for the validator to forward the request from end-users to the miner. It should be an open port in your firewall. It's optional
553- `--validator.use_wandb` - Use Weights & Biases for logging. It's optional.
554
555**Important**: `axon_port` and `gate_port` must be opened in your firewall.
556
557**Define bash variable in your terminal**
558```bash
559val_wallet="my_wallet"
560val_hotkey="my_hotkey"
561val_backend_host="localhost"
562val_backend_port=8080
563val_universal_backend_host="localhost"
564val_universal_backend_port=8090
565val_axon_port=12345
566val_gate_port=12346
567val_netuid=47
568val_subtensor_network="finney"
569```
570
5714. Run the validator backend.
572***NOTE***: You have to run **both** the research and universal tier backend.
573To run the research tier backend, run:
574```bash
575pm2 start python --name condense_validator_research_backend \
576-- -m gunicorn services.validator_backend.scoring.app:app \
577--workers 1 \
578--bind $val_backend_host:$val_backend_port \
579--timeout 0
580```
581
582To run the universal tier backend, run:
583```bash
584export TOGETHER_API_KEY=your_together_api_key
585pm2 start python --name condense_validator_universal_backend \
586-- -m gunicorn services.validator_backend.universal_scoring.app:app \
587--workers 1 -k uvicorn.workers.UvicornWorker \
588--bind $val_universal_backend_host:$val_universal_backend_port \
589--timeout 0
590```
591
5925. Run the validator script
593```bash
594export HF_HUB_ENABLE_HF_TRANSFER=1
595pm2 start python --name condense_validator \
596-- -m neurons.validator \
597--netuid $val_netuid \
598--subtensor.network $val_subtensor_network \
599--wallet.name $val_wallet \
600--wallet.hotkey $val_hotkey \
601--axon.port $val_axon_port \
602--validator.score_backend.host $val_backend_host \
603--validator.score_backend.port $val_backend_port \
604--validator.universal_score_backend.host $val_universal_backend_host \
605--validator.universal_score_backend.port $val_universal_backend_port \
606--validator.use_wandb
607```
608
6096. Run the auto update script, it will check for updates every 30 minutes
610```bash
611pm2 start auto_update.sh --name "auto_updater"
612```
613
6147. Run Organic Server for using Organic API
615```bash
616pm2 start python --name condense_organic \
617-- -m services.validator_backend.organic.app:app \
618--netuid $val_netuid \
619--subtensor.network $val_subtensor_network \
620--wallet.name $val_wallet \
621--wallet.hotkey $val_hotkey \
622--axon.port $val_axon_port \
623--validator.gate_port $val_gate_port \
624```
625
626
627
628---
629File: /neural_condense_core/base/__init__.py
630---
631
632from .miner import Miner as BaseMiner
633from .validator import Validator as BaseValidator
634
635__all__ = ["BaseMiner", "BaseValidator"]
636
637
638
639---
640File: /neural_condense_core/base/config.py
641---
642
643import bittensor as bt
644from argparse import ArgumentParser
645from ..constants import constants
646
647
648def add_common_config(parser: ArgumentParser):
649 parser.add_argument("--netuid", type=int, default=1, help="The chain subnet uid.")
650 parser.add_argument(
651 "--whitelist_uids",
652 type=str,
653 default=None,
654 help="The uids to whitelist. For testing purposes.",
655 )
656 bt.subtensor.add_args(parser)
657 bt.logging.add_args(parser)
658 bt.wallet.add_args(parser)
659 bt.axon.add_args(parser)
660 return parser
661
662
663def add_validator_config(parser: ArgumentParser):
664 parser.add_argument(
665 "--validator.gate_port",
666 type=int,
667 default=None,
668 help="The port of the validator gate server.",
669 )
670
671 parser.add_argument(
672 "--validator.score_backend.host",
673 type=str,
674 default="localhost",
675 help="The host of the score backend server.",
676 )
677 parser.add_argument(
678 "--validator.score_backend.port",
679 type=int,
680 default=8089,
681 help="The port of the score backend server.",
682 )
683 parser.add_argument(
684 "--validator.universal_score_backend.host",
685 type=str,
686 default="localhost",
687 help="The host of the universal score backend server.",
688 )
689 parser.add_argument(
690 "--validator.universal_score_backend.port",
691 type=int,
692 default=8090,
693 help="The port of the universal score backend server.",
694 )
695
696 parser.add_argument(
697 "--validator.organic_client_url",
698 type=str,
699 default=constants.ORGANIC_CLIENT_URL,
700 help="The URL of the organic client.",
701 )
702
703 parser.add_argument(
704 "--validator.report_url",
705 type=str,
706 default=constants.REPORT_URL,
707 help="The URL of the report server.",
708 )
709
710 parser.add_argument(
711 "--validator.use_wandb",
712 action="store_true",
713 help="Whether to use wandb for logging.",
714 )
715
716 return parser
717
718
719def add_miner_config(parser: ArgumentParser):
720 tier_names = list(constants.TIER_CONFIG.keys())
721 parser.add_argument(
722 "--miner.backend_host",
723 type=str,
724 default="localhost",
725 help="The host of the backend server.",
726 )
727 parser.add_argument(
728 "--miner.backend_port",
729 type=int,
730 default=8088,
731 help="The port of the backend server.",
732 )
733 parser.add_argument(
734 "--miner.tier",
735 choices=tier_names,
736 )
737 return parser
738
739
740
741---
742File: /neural_condense_core/base/miner.py
743---
744
745import os
746import argparse
747import bittensor as bt
748from typing import Tuple
749from .config import add_common_config, add_miner_config
750from ..protocol import Metadata
751
752
753class Miner:
754 def __init__(self):
755 self.config = self.get_config()
756 self.blacklist_fns = [self._blacklist_fn]
757 self.forward_fns = [self._forward_metadata]
758 self.setup_bittensor_objects()
759 self.metadata = {
760 "tier": self.config.miner.tier,
761 }
762
763 def get_config(self):
764 parser = argparse.ArgumentParser()
765 parser = add_miner_config(parser)
766 parser = add_common_config(parser)
767 config = bt.config(parser)
768 config.full_path = os.path.expanduser(
769 "{}/{}/{}/netuid_{}/{}".format(
770 config.logging.logging_dir,
771 config.wallet.name,
772 config.wallet.hotkey_str,
773 config.netuid,
774 "miner",
775 )
776 )
777 print(config)
778 os.makedirs(config.full_path, exist_ok=True)
779 return config
780
781 def setup_logging(self):
782 bt.logging.enable_default()
783 bt.logging.enable_info()
784
785 if self.config.logging.debug:
786 bt.logging.enable_debug()
787 if self.config.logging.trace:
788 bt.logging.enable_trace()
789 # Activate Bittensor's logging with the set configurations.
790 bt.logging(config=self.config, logging_dir=self.config.full_path)
791 bt.logging.info(
792 f"Running miner for subnet: {self.config.netuid} on network: {self.config.subtensor.network} with config:"
793 )
794 bt.logging.info(self.config)
795
796 def setup_bittensor_objects(self):
797 bt.logging.info("Setting up Bittensor objects.")
798 self.wallet = bt.wallet(config=self.config)
799 bt.logging.info(f"Wallet: {self.wallet}")
800 self.subtensor = bt.subtensor(config=self.config)
801 bt.logging.info(f"Subtensor: {self.subtensor}")
802 self.metagraph = self.subtensor.metagraph(self.config.netuid)
803 bt.logging.info(f"Metagraph: {self.metagraph}")
804 if self.wallet.hotkey.ss58_address not in self.metagraph.hotkeys:
805 bt.logging.error(
806 f"\nYour miner: {self.wallet} is not registered to chain connection: {self.subtensor} \nRun 'btcli register' and try again."
807 )
808 exit()
809 else:
810 self.my_subnet_uid = self.metagraph.hotkeys.index(
811 self.wallet.hotkey.ss58_address
812 )
813 bt.logging.info(f"Running miner on uid: {self.my_subnet_uid}")
814
815 def setup_axon(self):
816 self.axon = bt.axon(
817 wallet=self.wallet,
818 port=self.config.axon.port,
819 external_port=self.config.axon.external_port,
820 )
821 bt.logging.info("Attaching forward function to axon.")
822 for blacklist_fn, forward_fn in zip(self.blacklist_fns, self.forward_fns):
823 bt.logging.info(
824 f"Attaching blacklist_fn: {blacklist_fn} and forward_fn: {forward_fn}"
825 )
826 self.axon.attach(
827 forward_fn=forward_fn,
828 blacklist_fn=blacklist_fn,
829 )
830 bt.logging.info(
831 f"Serving axon on network: {self.config.subtensor.network} with netuid: {self.config.netuid}"
832 )
833 self.axon.serve(netuid=self.config.netuid, subtensor=self.subtensor)
834 bt.logging.info(f"Axon: {self.axon}")
835 bt.logging.info(f"Starting axon server on port: {self.config.axon.port}")
836 self.axon.start()
837
838 def _forward_metadata(self, synapse: Metadata) -> Metadata:
839 synapse.metadata = self.metadata
840 return synapse
841
842 def _blacklist_fn(self, synapse: Metadata) -> Tuple[bool, str]:
843 return False, "Always pass."
844
845
846
847---
848File: /neural_condense_core/base/validator.py
849---
850
851import os
852import asyncio
853import argparse
854import traceback
855import bittensor as bt
856import time
857import threading
858from .config import add_common_config, add_validator_config
859from abc import abstractmethod, ABC
860from ..constants import constants
861from ..logger import logger
862
863
864class Validator(ABC):
865 def __init__(self):
866 self.config = self.get_config()
867 print(self.config)
868 self.setup_logging()
869 self.setup_bittensor_objects()
870 self.last_update = 0
871 self.current_block = 0
872 self.uid = self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address)
873 self.is_running = False
874 self.should_exit = False
875 self.setup_axon()
876 self.loop = asyncio.get_event_loop()
877
878 def get_config(self):
879 parser = argparse.ArgumentParser()
880 parser = add_validator_config(parser)
881 parser = add_common_config(parser)
882 config = bt.config(parser)
883 config.full_path = os.path.expanduser(
884 "{}/{}/{}/netuid{}/{}".format(
885 config.logging.logging_dir,
886 config.wallet.name,
887 config.wallet.hotkey_str,
888 config.netuid,
889 "validator",
890 )
891 )
892 os.makedirs(config.full_path, exist_ok=True)
893 return config
894
895 def setup_logging(self):
896 bt.logging.enable_default()
897 bt.logging.enable_info()
898
899 if self.config.logging.debug:
900 bt.logging.enable_debug()
901 if self.config.logging.trace:
902 bt.logging.enable_trace()
903 bt.logging(config=self.config, logging_dir=self.config.full_path)
904 logger.info(
905 f"Running validator for subnet: {self.config.netuid} on network: {self.config.subtensor.network} with config:"
906 )
907 logger.info(self.config)
908 pass
909
910 def setup_bittensor_objects(self):
911 logger.info("Setting up Bittensor objects.")
912 self.wallet = bt.wallet(config=self.config)
913 logger.info(f"Wallet: {self.wallet}")
914 self.subtensor = bt.subtensor(config=self.config)
915 logger.info(f"Subtensor: {self.subtensor}")
916 self.dendrite = bt.dendrite(wallet=self.wallet)
917 logger.info(f"Dendrite: {self.dendrite}")
918 self.metagraph = self.subtensor.metagraph(self.config.netuid)
919 logger.info(f"Metagraph: {self.metagraph}")
920 if self.wallet.hotkey.ss58_address not in self.metagraph.hotkeys:
921 logger.error(
922 f"\nYour validator: {self.wallet} is not registered to chain connection: {self.subtensor} \nRun 'btcli register' and try again."
923 )
924 exit()
925 else:
926 self.my_subnet_uid = self.metagraph.hotkeys.index(
927 self.wallet.hotkey.ss58_address
928 )
929 logger.info(f"Running validator on uid: {self.my_subnet_uid}")
930
931 def setup_axon(self):
932 self.axon = bt.axon(wallet=self.wallet, config=self.config)
933 logger.info(
934 f"Serving axon on network: {self.config.subtensor.network} with netuid: {self.config.netuid}"
935 )
936 self.axon.serve(netuid=self.config.netuid, subtensor=self.subtensor)
937 logger.info(f"Axon: {self.axon}")
938 logger.info(f"Starting axon server on port: {self.config.axon.port}")
939 self.axon.start()
940
941 @abstractmethod
942 async def start_epoch(self):
943 pass
944
945 def run(self):
946 logger.info("Starting validator loop.")
947 while not self.should_exit:
948 start_epoch = time.time()
949
950 try:
951 self.loop.run_until_complete(self.start_epoch())
952 except Exception as e:
953 logger.error(f"Forward error: {e}")
954 traceback.print_exc()
955
956 end_epoch = time.time()
957 elapsed = end_epoch - start_epoch
958 time_to_sleep = max(0, constants.EPOCH_LENGTH - elapsed)
959
960 logger.info(f"Epoch finished. Sleeping for {time_to_sleep} seconds.")
961 time.sleep(time_to_sleep)
962
963 try:
964 self.resync_metagraph()
965 except Exception as e:
966 logger.error(f"Resync metagraph error: {e}")
967 traceback.print_exc()
968
969 # If someone intentionally stops the validator, it'll safely terminate operations.
970 except KeyboardInterrupt:
971 self.axon.stop()
972 logger.success("Validator killed by keyboard interrupt.")
973 exit()
974
975 @abstractmethod
976 def set_weights(self):
977 pass
978
979 def resync_metagraph(self):
980 self.metagraph.sync()
981
982 def run_in_background_thread(self):
983 """
984 Starts the validator's operations in a background thread upon entering the context.
985 This method facilitates the use of the validator in a 'with' statement.
986 """
987 if not self.is_running:
988 logger.debug("Starting validator in background thread.")
989 self.should_exit = False
990 self.thread = threading.Thread(target=self.run, daemon=True)
991 self.thread.start()
992 self.is_running = True
993 logger.debug("Started")
994
995 def __enter__(self):
996 self.run_in_background_thread()
997 return self
998
999 def __exit__(self, exc_type, exc_value, traceback):
1000 """
1001 Stops the validator's background operations upon exiting the context.
1002 This method facilitates the use of the validator in a 'with' statement.
1003
1004 Args:
1005 exc_type: The type of the exception that caused the context to be exited.
1006 None if the context was exited without an exception.
1007 exc_value: The instance of the exception that caused the context to be exited.
1008 None if the context was exited without an exception.
1009 traceback: A traceback object encoding the stack trace.
1010 None if the context was exited without an exception.
1011 """
1012 if self.is_running:
1013 logger.debug("Stopping validator in background thread.")
1014 self.should_exit = True
1015 self.thread.join(5)
1016 self.is_running = False
1017 logger.debug("Stopped")
1018
1019
1020
1021---
1022File: /neural_condense_core/common/__init__.py
1023---
1024
1025from .rate_limit import build_rate_limit
1026from . import base64
1027from .file import clean_tmp_directory
1028
1029__all__ = ["build_rate_limit", "base64", "clean_tmp_directory"]
1030
1031
1032
1033---
1034File: /neural_condense_core/common/base64.py
1035---
1036
1037import numpy as np
1038import base64
1039import io
1040
1041
1042def ndarray_to_base64(array: np.ndarray) -> str:
1043 """Convert a NumPy array to a base64-encoded string."""
1044 try:
1045 buffer = io.BytesIO()
1046 np.save(buffer, array)
1047 buffer.seek(0)
1048 base64_str = base64.b64encode(buffer.read()).decode("utf-8")
1049 except Exception:
1050 base64_str = ""
1051 return base64_str
1052
1053
1054def base64_to_ndarray(base64_str: str) -> np.ndarray:
1055 """Convert a base64-encoded string back to a NumPy array."""
1056 try:
1057 buffer = io.BytesIO(base64.b64decode(base64_str))
1058 buffer.seek(0)
1059 array = np.load(buffer)
1060 array = array.astype(np.float32)
1061 except Exception as e:
1062 print(f"Base64 to ndarray error: {e}")
1063 array = np.array([])
1064 return array
1065
1066
1067
1068---
1069File: /neural_condense_core/common/file.py
1070---
1071
1072import hf_transfer
1073import numpy as np
1074import io
1075import time
1076import httpx
1077import os
1078from rich.progress import track
1079from ..logger import logger
1080import asyncio
1081import sys
1082import uuid
1083from concurrent.futures import ThreadPoolExecutor
1084from firerequests import FireRequests
1085
1086fire_downloader = FireRequests()
1087
1088
1089def clean_tmp_directory():
1090 """Clean the tmp directory if running as validator."""
1091 if (
1092 __name__ != "__main__"
1093 and os.path.basename(os.path.abspath(sys.argv[0])) == "validator.py"
1094 ):
1095 os.makedirs("tmp", exist_ok=True)
1096 for file in track(os.listdir("tmp"), description="Cleaning tmp directory"):
1097 os.remove(os.path.join("tmp", file))
1098
1099
1100def _check_file_size(response: httpx.Response, max_size_mb: int) -> tuple[bool, str]:
1101 """Check if file size is within limits."""
1102 content_length = int(response.headers.get("content-length", 0))
1103 max_size_bytes = max_size_mb * 1024 * 1024
1104
1105 if content_length > max_size_bytes:
1106 return (
1107 False,
1108 f"File too large: {content_length / (1024 * 1024):.1f}MB exceeds {max_size_mb}MB limit",
1109 )
1110 return True, ""
1111
1112
1113def _generate_filename(url: str) -> str:
1114 """Generate a unique filename for downloaded file."""
1115 return os.path.join("tmp", str(uuid.uuid4()) + "_" + url.split("/")[-1])
1116
1117
1118async def _download(url: str) -> tuple[str, float, str]:
1119 """Download file using hf_transfer."""
1120 debug_start_time = time.time()
1121 try:
1122 filename = _generate_filename(url)
1123 start_time = time.time()
1124
1125 await fire_downloader.download_file(
1126 url=url,
1127 filename=filename,
1128 max_files=10, # Number of parallel downloads
1129 chunk_size=1024 * 1024, # 1 MB chunks
1130 parallel_failures=3,
1131 max_retries=5,
1132 headers=None,
1133 show_progress=False,
1134 )
1135
1136 download_time = time.time() - start_time
1137 logger.info(f"Time taken to download: {download_time:.2f} seconds")
1138 return filename, download_time, ""
1139 except Exception as e:
1140 return "", time.time() - debug_start_time, "Download failed: " + str(e)
1141
1142
1143def _load_and_cleanup(filename: str) -> tuple[np.ndarray | None, str]:
1144 """Load NPY file and convert to float32."""
1145 try:
1146 with open(filename, "rb") as f:
1147 buffer = io.BytesIO(f.read())
1148 data = np.load(buffer)
1149 return data.astype(np.float32), ""
1150 except Exception as e:
1151 logger.error(f"Error loading NPY file: {e}")
1152 return None, str(e)
1153
1154
1155async def load_npy_from_url(
1156 url: str, max_size_mb: int = 1024
1157) -> tuple[np.ndarray | None, str, float, str]:
1158 """
1159 Load a `.npy` file from a URL using the hf_transfer library for efficient downloading.
1160
1161 Args:
1162 url (str): URL of the `.npy` file.
1163 max_size_mb (int): Maximum allowed file size in megabytes.
1164
1165 Returns:
1166 tuple: (data, filename, download_time, error_message)
1167 - data: Loaded NumPy array or None if error
1168 - filename: Local filename where data was saved
1169 - download_time: Time taken to download in seconds
1170 - error_message: Empty string if successful, error description if failed
1171 """
1172 try:
1173 # Check file size using HTTP HEAD request
1174 async with httpx.AsyncClient() as client:
1175 response = await client.head(url)
1176 if response.status_code != 200:
1177 return (
1178 None,
1179 "",
1180 0,
1181 f"Failed to fetch file info: HTTP {response.status_code}",
1182 )
1183
1184 size_ok, error = _check_file_size(response, max_size_mb)
1185 if not size_ok:
1186 return None, "", 0, error
1187
1188 # Download and process file in thread pool asyncio
1189 filename, download_time, error = await _download(url)
1190 if error:
1191 return None, "", 0, error
1192
1193 data, error = _load_and_cleanup(filename)
1194 if error:
1195 return None, "", 0, error
1196
1197 return data, filename, download_time, ""
1198
1199 except Exception as e:
1200 return None, "", 0, str(e)
1201
1202
1203# Clean tmp directory on module load if running as validator
1204clean_tmp_directory()
1205
1206
1207
1208---
1209File: /neural_condense_core/common/rate_limit.py
1210---
1211
1212import pandas as pd
1213from ..logger import logger
1214from ..constants import constants
1215
1216
1217def build_rate_limit(metagraph, config=None, tier=None):
1218 S = metagraph.S
1219 if config and config.whitelist_uids:
1220 whitelist_uids = [int(uid) for uid in config.whitelist_uids.split(",")]
1221 else:
1222 whitelist_uids = [i for i in range(len(S)) if S[i] > constants.MIN_STAKE]
1223
1224 selected_tier_config = constants.TIER_CONFIG[tier or config.miner.tier]
1225 rpe = selected_tier_config.requests_per_epoch
1226
1227 # Calculate total stake of whitelisted UIDs
1228 total_stake = sum(S[uid] for uid in whitelist_uids)
1229
1230 # Compute rate limits based on normalized stakes
1231 rate_limits = {}
1232 for uid in whitelist_uids:
1233 normalized_stake = S[uid] / total_stake if total_stake > 0 else 0
1234 rate_limits[uid] = max(int(rpe * normalized_stake), 10)
1235
1236 # Set rate limit to 0 for non-whitelisted UIDs
1237 for uid in range(len(S)):
1238 if uid not in whitelist_uids:
1239 rate_limits[uid] = 0
1240
1241 _df = pd.DataFrame(
1242 {
1243 "uids": whitelist_uids,
1244 "rate_limits": [rate_limits[uid] for uid in whitelist_uids],
1245 }
1246 )
1247 logger.info(f"Rate limits for tier {tier}:\n{_df.to_markdown()}")
1248 return rate_limits
1249
1250
1251
1252---
1253File: /neural_condense_core/miner_utils/__init__.py
1254---
1255
1256from .rate_limit_counter import RateLimitCounter
1257
1258__all__ = ["RateLimitCounter"]
1259
1260
1261
1262---
1263File: /neural_condense_core/miner_utils/rate_limit_counter.py
1264---
1265
1266import time
1267
1268
1269class RateLimitCounter:
1270 def __init__(self, rate_limit: int, period: int):
1271 self.rate_limit = rate_limit
1272 self.period = period
1273 self.count = 0
1274 self.last_reset = time.time()
1275
1276 def is_first_request(self):
1277 return self.count == 0
1278
1279 def increment(self):
1280 now = time.time()
1281 if now - self.last_reset > self.period:
1282 self.count = 0
1283 self.last_reset = now
1284 self.count += 1
1285 return self.count <= self.rate_limit
1286
1287 def reset(self):
1288 self.count = 0
1289 self.last_reset = time.time()
1290
1291 def get_count(self):
1292 return self.count
1293
1294 def get_rate_limit(self):
1295 return self.rate_limit
1296
1297 def get_period(self):
1298 return self.period
1299
1300
1301
1302---
1303File: /neural_condense_core/validator_utils/loop/__init__.py
1304---
1305
1306from .forward import (
1307 get_task_config,
1308 initialize_wandb,
1309 query_miners,
1310 prepare_synapse,
1311 validate_responses,
1312 process_and_score_responses,
1313)
1314from . import logging
1315
1316__all__ = [
1317 "get_task_config",
1318 "initialize_wandb",
1319 "query_miners",
1320 "prepare_synapse",
1321 "validate_responses",
1322 "process_and_score_responses",
1323 "logging",
1324]
1325
1326
1327
1328---
1329File: /neural_condense_core/validator_utils/loop/forward.py
1330---
1331
1332import neural_condense_core as ncc
1333import bittensor as bt
1334import random
1335import httpx
1336import wandb
1337from ...protocol import TextCompressProtocol
1338from ...logger import logger
1339from ..synthesizing.challenge_generator import ChallengeGenerator
1340from ..managing.miner_manager import MinerManager, ServingCounter, MinerMetadata
1341from ...constants import SyntheticTaskConfig, TierConfig
1342import asyncio
1343import os
1344import traceback
1345
1346
1347def get_task_config() -> SyntheticTaskConfig:
1348 """
1349 Get a random task configuration based on weights.
1350
1351 Returns:
1352 SyntheticTaskConfig: The selected task configuration
1353 """
1354 return random.choices(
1355 ncc.constants.SYNTHETIC_TASK_CONFIG,
1356 weights=[t.weight for t in ncc.constants.SYNTHETIC_TASK_CONFIG],
1357 )[0]
1358
1359
1360async def prepare_synapse(
1361 challenge_generator: ChallengeGenerator,
1362 tier: str,
1363 task_config: SyntheticTaskConfig,
1364 tier_config: TierConfig,
1365 model_name: str,
1366) -> TextCompressProtocol:
1367 """
1368 Prepare a synapse for validation.
1369
1370 Args:
1371 tokenizer: The tokenizer to use
1372 task_config (SyntheticTaskConfig): Configuration for the task
1373 tier_config (TierConfig): Configuration for the tier
1374 model_name (str): Name of the model to use
1375
1376 Returns:
1377 The prepared synapse object
1378 """
1379 try:
1380 synapse = await challenge_generator.generate_challenge(
1381 model_name=model_name,
1382 tier=tier,
1383 task=task_config.task,
1384 max_context_length_in_chars=tier_config.max_context_length_in_chars,
1385 )
1386 synapse.target_model = model_name
1387 synapse.tier = tier
1388 except Exception as e:
1389 logger.error(f"Error generating challenge: {e}")
1390 traceback.print_exc()
1391 return None
1392 return synapse
1393
1394
1395async def query_miners(
1396 dendrite: bt.dendrite,
1397 metagraph: bt.metagraph,
1398 uids: list[int],
1399 synapse,
1400 timeout: int,
1401) -> list[TextCompressProtocol]:
1402 """
1403 Query a group of miners with a synapse.
1404
1405 Args:
1406 dendrite: The dendrite connection
1407 uids (list[int]): List of miner UIDs to query
1408 synapse: The synapse to send
1409 timeout (int): Query timeout in seconds
1410
1411 Returns:
1412 list: Responses from the miners
1413 """
1414 batched_uids = [
1415 uids[i : i + ncc.constants.BATCH_SIZE]
1416 for i in range(0, len(uids), ncc.constants.BATCH_SIZE)
1417 ]
1418 all_responses = []
1419 for batch_uids in batched_uids:
1420 responses = await dendrite.forward(
1421 axons=[metagraph.axons[uid] for uid in batch_uids],
1422 synapse=synapse,
1423 deserialize=False,
1424 timeout=timeout,
1425 )
1426 all_responses.extend(responses)
1427 return all_responses
1428
1429
1430async def validate_responses(
1431 responses: list[TextCompressProtocol],
1432 uids: list[int],
1433 tier_config: TierConfig,
1434 tier: str,
1435 tokenizer=None,
1436 ground_truth_synapse: TextCompressProtocol = None,
1437) -> tuple[list[TextCompressProtocol], list[int], list[int], list[str]]:
1438 valid_responses, valid_uids, invalid_uids, invalid_reasons = [], [], [], []
1439
1440 # Add recursion limit protection
1441 async def verify_single_response(response):
1442 try:
1443 # Add timeout to prevent hanging
1444 is_valid, reason = await asyncio.wait_for(
1445 TextCompressProtocol.verify(
1446 response,
1447 tier_config,
1448 tier,
1449 tokenizer,
1450 ground_truth_synapse,
1451 ),
1452 timeout=360,
1453 )
1454 return is_valid, reason
1455 except asyncio.TimeoutError:
1456 return False, "Verification timeout"
1457 except RecursionError:
1458 return False, "Recursion limit exceeded"
1459 except Exception as e:
1460 return False, f"Failed to verify: {str(e)}"
1461
1462 results = []
1463 for response in responses:
1464 verify_result = await verify_single_response(response)
1465 results.append(verify_result)
1466
1467 # Process results maintaining order
1468 for uid, (is_valid, reason), response in zip(uids, results, responses):
1469 if is_valid:
1470 valid_responses.append(response)
1471 valid_uids.append(uid)
1472 else:
1473 invalid_uids.append(uid)
1474 invalid_reasons.append(reason)
1475
1476 return valid_responses, valid_uids, invalid_uids, invalid_reasons
1477
1478
1479async def process_and_score_responses(
1480 miner_manager: MinerManager,
1481 valid_responses: list[TextCompressProtocol],
1482 valid_uids: list[int],
1483 invalid_uids: list[int],
1484 ground_truth_synapse: TextCompressProtocol,
1485 model_name: str,
1486 task_config: SyntheticTaskConfig,
1487 tier_config: TierConfig,
1488 config: bt.config = None,
1489 invalid_reasons: list[str] = [],
1490 timeout: int = 120,
1491 tier: str = "",
1492) -> dict[str, list]:
1493 if len(valid_responses) > 0:
1494 accuracies, accelerate_rewards = await get_accuracies(
1495 valid_responses=valid_responses,
1496 ground_truth_synapse=ground_truth_synapse,
1497 model_name=model_name,
1498 task_config=task_config,
1499 timeout=timeout,
1500 config=config,
1501 tier=tier,
1502 )
1503 scores = [
1504 (
1505 accu * (1 - tier_config.accelerate_reward_scalar)
1506 + accel * tier_config.accelerate_reward_scalar
1507 )
1508 * (accu > 0)
1509 for accu, accel in zip(accuracies, accelerate_rewards)
1510 ] + [0] * len(invalid_uids)
1511 else:
1512 scores = [0] * len(valid_uids) + [0] * len(invalid_uids)
1513 accuracies = []
1514 accelerate_rewards = []
1515 total_uids = valid_uids + invalid_uids
1516 updated_scores, previous_scores = miner_manager.update_scores(
1517 scores=scores,
1518 total_uids=total_uids,
1519 )
1520 score_changes = [
1521 f"{round(previous_scores[i], 3)} -> {round(updated_scores[i], 3)}"
1522 for i in range(len(previous_scores))
1523 ]
1524 logs = {
1525 "uid": total_uids,
1526 "accuracy": accuracies + [0] * len(invalid_uids),
1527 "accelerate_reward": accelerate_rewards + [0] * len(invalid_uids),
1528 "score_change": score_changes,
1529 "invalid_reasons": [""] * len(valid_uids) + invalid_reasons,
1530 }
1531 return logs, total_uids
1532
1533
1534def update_metrics_of_invalid_miners(
1535 invalid_uids: list[int],
1536 metrics: dict,
1537):
1538 for metric_name, values in metrics.items():
1539 values.extend([0] * len(invalid_uids))
1540 return metrics
1541
1542
1543async def get_accuracies(
1544 valid_responses: list,
1545 ground_truth_synapse: TextCompressProtocol,
1546 model_name: str,
1547 task_config: SyntheticTaskConfig,
1548 timeout: int = 240,
1549 config: bt.config = None,
1550 tier: str = "",
1551) -> tuple[list, list]:
1552 payload = TextCompressProtocol.get_scoring_payload(
1553 responses=valid_responses,
1554 ground_truth_synapse=ground_truth_synapse,
1555 target_model=model_name,
1556 criterias=task_config.criterias,
1557 ).model_dump()
1558 if tier == "universal":
1559 url = f"http://{config.validator.universal_score_backend.host}:{config.validator.universal_score_backend.port}/get_metrics"
1560 else:
1561 url = f"http://{config.validator.score_backend.host}:{config.validator.score_backend.port}/get_metrics"
1562 logger.info(f"Sending payload to scoring backend: {url}")
1563 async with httpx.AsyncClient() as client:
1564 try:
1565 response = await client.post(
1566 url,
1567 json=payload,
1568 timeout=timeout,
1569 )
1570 except Exception as e:
1571 logger.error(f"Error sending payload to scoring backend: {e}")
1572 for r in valid_responses:
1573 try:
1574 if r.util_data.local_filename:
1575 os.remove(r.util_data.local_filename)
1576 except Exception as e:
1577 logger.error(
1578 f"Error removing local file {r.util_data.local_filename}: {e}"
1579 )
1580 logger.info("Removed all local files")
1581 if response.status_code != 200:
1582 raise Exception(
1583 f"Scoring backend returned status code {response.status_code}"
1584 )
1585 scoring_response = response.json()
1586
1587 accuracies = scoring_response["metrics"]["accuracy"]
1588 accelerate_rewards = [r.accelerate_score for r in valid_responses]
1589 return accuracies, accelerate_rewards
1590
1591
1592def initialize_wandb(dendrite: bt.dendrite, metagraph: bt.metagraph, uid: int):
1593 try:
1594 message = "incentivized-decentralzied-condensed-ai" + "-".join(
1595 random.choices("0123456789abcdef", k=16)
1596 )
1597 signature = f"0x{dendrite.keypair.sign(message).hex()}"
1598 wandb.init(
1599 project="Neural-Condense-Subnet",
1600 name=f"validator-{uid}",
1601 entity="toilaluan",
1602 job_type="validation",
1603 group="validator",
1604 resume="allow",
1605 config={
1606 "signature": signature,
1607 "uid": uid,
1608 "message": message,
1609 "ss58_address": metagraph.hotkeys[uid],
1610 },
1611 )
1612 except Exception as e:
1613 logger.error(f"Starting wandb error: {e}")
1614
1615
1616
1617---
1618File: /neural_condense_core/validator_utils/loop/logging.py
1619---
1620
1621import wandb
1622import pandas as pd
1623from ...logger import logger
1624
1625
1626def log_wandb(logs: dict, uids: list[int], tier=""):
1627 try:
1628 for metric, values in logs.items():
1629 if metric == "perplexity":
1630 for uid, value in zip(uids, values):
1631 if value is None or not isinstance(value, float):
1632 continue
1633 wandb.log({f"{tier}-{uid}/perplexity": abs(value)})
1634 except Exception as e:
1635 logger.error(f"Error logging to wandb: {e}")
1636
1637
1638def log_as_dataframe(data: dict):
1639 for metric, values in data.items():
1640 for i in range(len(values)):
1641 if values[i] is None:
1642 values[i] = "N/A"
1643 if isinstance(values[i], float):
1644 values[i] = round(values[i], 2)
1645 df = pd.DataFrame(data)
1646 return df
1647
1648
1649
1650---
1651File: /neural_condense_core/validator_utils/managing/__init__.py
1652---
1653
1654from .miner_manager import MinerManager, ServingCounter
1655
1656__all__ = [
1657 "MinerManager",
1658 "ServingCounter",
1659]
1660
1661
1662
1663---
1664File: /neural_condense_core/validator_utils/managing/metric_converter.py
1665---
1666
1667from ...constants import TierConfig
1668
1669
1670class MetricConverter:
1671 def __init__(self):
1672 self.converters = {
1673 "perplexity": self.perplexity_to_score,
1674 "accuracy": self.accuracy_to_score,
1675 "bleu": self.bleu_to_score,
1676 }
1677
1678 def convert_metrics_to_score(
1679 self, metrics: dict, tier_config: TierConfig
1680 ) -> dict[str, list[float]]:
1681 total_scores = {}
1682 accelerate_bonuses = self.get_accelerate_bonuses(metrics, tier_config)
1683 for metric, values in metrics.items():
1684 try:
1685 converter = self.converters[metric]
1686 scores = converter(values)
1687 scores = [
1688 s * (1 + a) if a is not None else s
1689 for s, a in zip(scores, accelerate_bonuses)
1690 ]
1691 total_scores[metric] = scores
1692 except KeyError:
1693 continue
1694 return total_scores
1695
1696 def perplexity_to_score(self, perplexities: list[float]):
1697 valid_perplexities = [p for p in perplexities if p is not None]
1698 if not valid_perplexities:
1699 return perplexities
1700 pivot = min(valid_perplexities)
1701 scores = [pivot / p if p is not None else None for p in perplexities]
1702 return scores
1703
1704 def accuracy_to_score(self, accuracies: list[float]):
1705 return accuracies
1706
1707 def get_accelerate_bonuses(self, metrics: dict, tier_config: TierConfig):
1708 accelerate_metrics = metrics["accelerate_metrics"]
1709 return [
1710 s * tier_config.accelerate_reward_scalar if s is not None else None
1711 for s in accelerate_metrics
1712 ]
1713
1714 def bleu_to_score(self, bleus: list[float]):
1715 return bleus
1716
1717
1718
1719---
1720File: /neural_condense_core/validator_utils/managing/miner_manager.py
1721---
1722
1723import bittensor as bt
1724import numpy as np
1725import httpx
1726import pandas as pd
1727import time
1728import asyncio
1729from .metric_converter import MetricConverter
1730from ...common import build_rate_limit
1731from ...protocol import Metadata
1732from ...constants import constants
1733from .utils import (
1734 apply_top_percentage_threshold,
1735 standardize_scores,
1736 normalize_and_weight_scores,
1737)
1738from ...logger import logger
1739import redis
1740from sqlalchemy import create_engine, Column, Integer, String, Float
1741from sqlalchemy.ext.declarative import declarative_base
1742from sqlalchemy.orm import sessionmaker
1743
1744Base = declarative_base()
1745
1746
1747class MinerMetadata(Base):
1748 """
1749 SQLAlchemy model for storing miner metadata.
1750
1751 Attributes:
1752 uid (int): Unique identifier for the miner
1753 tier (str): Miner's tier level (default: "unknown")
1754 score (float): Miner's performance score (default: 0.0)
1755 """
1756
1757 __tablename__ = "miner_metadata"
1758
1759 uid = Column(Integer, primary_key=True)
1760 tier = Column(String, default="unknown")
1761 score = Column(Float, default=0.0)
1762
1763 def __init__(self, uid, tier="unknown", score=0.0):
1764 self.uid = uid
1765 self.tier = tier
1766 self.score = score
1767
1768 def to_dict(self):
1769 """Convert metadata to dictionary format."""
1770 return {"uid": self.uid, "tier": self.tier, "score": self.score}
1771
1772
1773class ServingCounter:
1774 """
1775 Redis-based rate limiter for miner requests.
1776
1777 Uses atomic Redis operations to track and limit request rates per miner.
1778
1779 Attributes:
1780 rate_limit (int): Max requests allowed per epoch
1781 redis_client (redis.Redis): Redis connection for distributed counting
1782 key (str): Unique Redis key for this counter
1783 expire_time (int): TTL for counter keys in Redis
1784 """
1785
1786 def __init__(
1787 self,
1788 rate_limit: int,
1789 uid: int,
1790 tier: str,
1791 redis_client: redis.Redis,
1792 postfix_key: str = "",
1793 ):
1794 self.rate_limit = rate_limit
1795 self.redis_client = redis_client
1796 self.key = constants.DATABASE_CONFIG.redis.serving_counter_key_format.format(
1797 tier=tier,
1798 uid=uid,
1799 ) + str(postfix_key)
1800
1801 def increment(self) -> bool:
1802 """
1803 Increment request counter and check rate limit.
1804
1805 Uses atomic Redis INCR operation and sets expiry on first increment.
1806
1807 Reset the counter after EPOCH_LENGTH seconds.
1808
1809 Returns:
1810 bool: True if under rate limit, False if exceeded
1811 """
1812 count = self.redis_client.incr(self.key)
1813
1814 if count == 1:
1815 self.redis_client.expire(self.key, constants.EPOCH_LENGTH)
1816
1817 if count <= self.rate_limit:
1818 return True
1819
1820 logger.info(f"Rate limit exceeded for {self.key}")
1821 return False
1822
1823 def get_current_count(self):
1824 return self.redis_client.get(self.key)
1825
1826 def reset_counter(self):
1827 self.redis_client.set(self.key, 0)
1828
1829
1830class MinerManager:
1831 """
1832 Manages miner metadata, scoring and rate limiting.
1833
1834 Handles:
1835 - Miner metadata storage and updates
1836 - Performance scoring and normalization
1837 - Request rate limiting per miner
1838 - Synchronization with validator network
1839
1840 Attributes:
1841 wallet: Bittensor wallet
1842 dendrite: Network communication client
1843 metagraph: Network state/topology
1844 config: Validator configuration
1845 redis_client: Redis connection
1846 session: SQLAlchemy database session
1847 metric_converter: Converts raw metrics to scores
1848 rate_limit_per_tier: Request limits by tier
1849 """
1850
1851 def __init__(self, uid, wallet, metagraph, config=None):
1852 self.is_main_process = bool(config)
1853 self.config = config
1854 self.uid = uid
1855 self.wallet = wallet
1856 self.dendrite = bt.dendrite(wallet=self.wallet)
1857 self.metagraph = metagraph
1858
1859 # Initialize Redis
1860 redis_config = constants.DATABASE_CONFIG.redis
1861 self.redis_client = redis.Redis(
1862 host=redis_config.host, port=redis_config.port, db=redis_config.db
1863 )
1864
1865 # Initialize SQLAlchemy
1866 self.engine = create_engine(constants.DATABASE_CONFIG.sql.url)
1867 Base.metadata.create_all(self.engine)
1868 Session = sessionmaker(bind=self.engine)
1869 self.session = Session()
1870
1871 self._init_metadata()
1872 self.metric_converter = MetricConverter()
1873 self.rate_limit_per_tier = self.get_rate_limit_per_tier()
1874 logger.info(f"Rate limit per tier: {self.rate_limit_per_tier}")
1875
1876 self.loop = asyncio.get_event_loop()
1877 self.loop.run_until_complete(self.sync())
1878
1879 def get_metadata(self, uids: list[int] = []) -> dict[int, MinerMetadata]:
1880 """
1881 Get metadata for specified miner UIDs.
1882
1883 Args:
1884 uids: List of miner UIDs to fetch. Empty list returns all miners.
1885
1886 Returns:
1887 dict: Mapping of UID to miner metadata
1888 """
1889 query = self.session.query(MinerMetadata)
1890 if uids:
1891 query = query.filter(MinerMetadata.uid.in_(uids))
1892 return {miner.uid: miner for miner in query.all()}
1893
1894 def update_scores(self, scores: list[float], total_uids: list[int]):
1895 """
1896 Update miner scores with exponential moving average.
1897
1898 Args:
1899 scores: New performance scores
1900 total_uids: UIDs corresponding to scores
1901
1902 Returns:
1903 tuple: Updated scores and previous scores
1904 """
1905 updated_scores = []
1906 previous_scores = []
1907
1908 for uid, score in zip(total_uids, scores):
1909 miner = self.session.query(MinerMetadata).get(uid)
1910 previous_scores.append(miner.score)
1911
1912 # EMA with 0.9 decay factor
1913 miner.score = miner.score * 0.9 + score * 0.1
1914 miner.score = max(0, miner.score)
1915 updated_scores.append(miner.score)
1916
1917 self.session.commit()
1918 return updated_scores, previous_scores
1919
1920 def get_normalized_ratings(self, top_percentage: float = 1.0) -> np.ndarray:
1921 """
1922 Calculate normalized ratings across all miners.
1923
1924 Applies:
1925 1. Top percentage thresholding
1926 2. Score standardization
1927 3. Sigmoid compression
1928 4. Tier-based incentive weighting
1929
1930 Args:
1931 top_percentage: Fraction of top miners to consider
1932
1933 Returns:
1934 np.ndarray: Normalized ratings for all miners
1935 """
1936 weights = np.zeros(len(self.metagraph.hotkeys))
1937
1938 for tier in constants.TIER_CONFIG:
1939 tier_weights = self._get_tier_weights(tier, top_percentage)
1940 for uid, weight in tier_weights.items():
1941 weights[uid] = weight
1942
1943 return weights
1944
1945 def _get_tier_weights(self, tier: str, top_percentage: float) -> dict[int, float]:
1946 """
1947 Calculate weights for miners in a specific tier.
1948
1949 Args:
1950 tier: The tier to calculate weights for
1951 top_percentage: Fraction of top miners to consider
1952
1953 Returns:
1954 dict: Mapping of UID to weight for miners in tier
1955 """
1956 # Get scores for miners in this tier
1957 miners = self.session.query(MinerMetadata).filter_by(tier=tier).all()
1958 tier_scores = [m.score for m in miners]
1959 tier_uids = [m.uid for m in miners]
1960
1961 if not tier_scores:
1962 return {}
1963
1964 scores = apply_top_percentage_threshold(tier_scores, tier_uids, top_percentage)
1965 scores = normalize_and_weight_scores(scores, tier)
1966
1967 return dict(zip(tier_uids, scores))
1968
1969 def _init_metadata(self):
1970 """Initialize metadata entries for all miners."""
1971 for uid in self.metagraph.uids:
1972 try:
1973 self.session.query(MinerMetadata).get(uid)
1974 except Exception as e:
1975 logger.info(f"Reinitialize uid {uid}, {e}")
1976 self.session.add(MinerMetadata(uid=uid))
1977 self.session.commit()
1978
1979 async def sync(self):
1980 """Synchronize metadata and rate limiters."""
1981 logger.info("Synchronizing metadata and serving counters.")
1982 self.rate_limit_per_tier = self.get_rate_limit_per_tier()
1983 logger.info(f"Rate limit per tier: {self.rate_limit_per_tier}")
1984
1985 self.serving_counter = self._create_serving_counter()
1986
1987 if self.is_main_process:
1988 await self._update_metadata()
1989 self._log_metadata()
1990
1991 def _log_metadata(self):
1992 """Log current metadata as formatted DataFrame."""
1993 metadata = {
1994 m.uid: {"tier": m.tier, "elo_rating": m.score * 100}
1995 for m in self.session.query(MinerMetadata).all()
1996 }
1997 df = pd.DataFrame(metadata).T.reset_index()
1998 df.columns = ["uid", "tier", "elo_rating"]
1999 logger.info("Metadata:\n" + df.to_markdown())
2000
2001 async def report_metadata(self):
2002 """Report metadata to validator server."""
2003 metadata = {
2004 m.uid: {"tier": m.tier, "elo_rating": m.score * 100}
2005 for m in self.session.query(MinerMetadata).all()
2006 }
2007 await self.report(metadata, "api/report-metadata")
2008
2009 async def report(self, payload: dict, endpoint: str):
2010 """
2011 Send signed report to validator server.
2012
2013 Args:
2014 payload: Data to report
2015 endpoint: API endpoint path
2016 """
2017 url = f"{self.config.validator.report_url}/{endpoint}"
2018 nonce = str(time.time_ns())
2019 signature = f"0x{self.dendrite.keypair.sign(nonce).hex()}"
2020
2021 headers = {
2022 "Content-Type": "application/json",
2023 "message": nonce,
2024 "ss58_address": self.wallet.hotkey.ss58_address,
2025 "signature": signature,
2026 }
2027
2028 async with httpx.AsyncClient() as client:
2029 response = await client.post(
2030 url,
2031 json=payload,
2032 headers=headers,
2033 timeout=32,
2034 )
2035
2036 if response.status_code != 200:
2037 logger.error(f"Failed to report to {endpoint}. Response: {response.text}")
2038 else:
2039 logger.info(f"Reported to {endpoint}.")
2040
2041 async def _update_metadata(self):
2042 """Update metadata by querying miner status."""
2043 synapse = Metadata()
2044 uids = list(range(len(self.metagraph.hotkeys)))
2045 axons = [self.metagraph.axons[uid] for uid in uids]
2046
2047 responses = await self.dendrite.forward(
2048 axons,
2049 synapse,
2050 deserialize=False,
2051 timeout=16,
2052 )
2053
2054 for uid, response in zip(uids, responses):
2055 miner = self.session.query(MinerMetadata).get(uid)
2056 if not miner:
2057 miner = MinerMetadata(uid=uid)
2058 self.session.add(miner)
2059
2060 current_tier = miner.tier
2061 new_tier = current_tier
2062
2063 if response and response.metadata.get("tier") is not None:
2064 new_tier = response.metadata["tier"]
2065
2066 if new_tier != current_tier:
2067 logger.info(
2068 f"Tier of uid {uid} changed from {current_tier} to {new_tier}"
2069 )
2070 miner.score = 0
2071
2072 miner.tier = new_tier
2073
2074 self.session.commit()
2075 logger.info(f"Updated metadata for {len(uids)} uids")
2076
2077 def get_rate_limit_per_tier(self):
2078 """Get request rate limits for each tier."""
2079 return {
2080 tier: build_rate_limit(self.metagraph, self.config, tier)[self.uid]
2081 for tier in constants.TIER_CONFIG
2082 }
2083
2084 def _create_serving_counter(self):
2085 """
2086 Create rate limiters for each miner by tier.
2087
2088 Returns:
2089 dict: Nested dict of tier -> uid -> counter
2090 """
2091 counters = {tier: {} for tier in constants.TIER_CONFIG}
2092
2093 for miner in self.session.query(MinerMetadata).all():
2094 tier = miner.tier
2095 if tier not in constants.TIER_CONFIG:
2096 continue
2097
2098 counter = ServingCounter(
2099 self.rate_limit_per_tier[tier], miner.uid, tier, self.redis_client
2100 )
2101 counters[tier][miner.uid] = counter
2102
2103 return counters
2104
2105
2106
2107---
2108File: /neural_condense_core/validator_utils/managing/utils.py
2109---
2110
2111import numpy as np
2112from ...constants import constants
2113from ...logger import logger
2114
2115
2116def apply_top_percentage_threshold(
2117 scores: list[float], uids: list[int], top_percentage: float
2118) -> np.ndarray:
2119 """Apply threshold to keep only top percentage of scores."""
2120 n_top = max(1, int(len(scores) * top_percentage))
2121 top_miners = sorted(zip(uids, scores), key=lambda x: x[1], reverse=True)[:n_top]
2122 top_uids = {uid for uid, _ in top_miners}
2123
2124 return np.array(
2125 [score if uid in top_uids else 0 for uid, score in zip(uids, scores)]
2126 )
2127
2128
2129def standardize_scores(scores: np.ndarray, tier: str) -> np.ndarray:
2130 """Standardize non-zero scores using mean and clamped standard deviation."""
2131 nonzero = scores > 0
2132 if not np.any(nonzero):
2133 return scores
2134
2135 curr_std = np.std(scores[nonzero])
2136 curr_mean = np.mean(scores[nonzero])
2137
2138 if curr_std > 0:
2139 target_std = min(curr_std, constants.EXPECTED_MAX_STD_SCORE)
2140 scale = target_std / curr_std
2141
2142 centered = scores[nonzero] - curr_mean
2143 scaled = centered * scale
2144 compressed = np.tanh(scaled * 0.5) * target_std
2145 scores[nonzero] = compressed + constants.EXPECTED_MEAN_SCORE
2146
2147 logger.info(
2148 "adjust_ratings",
2149 tier=tier,
2150 mean=curr_mean,
2151 std=curr_std,
2152 scale_factor=scale,
2153 )
2154
2155 return scores
2156
2157
2158def normalize_and_weight_scores(scores: np.ndarray, tier: str) -> np.ndarray:
2159 """Normalize scores to sum to 1 and apply tier incentive weighting."""
2160 total = np.sum(scores)
2161 if total > 0:
2162 scores = scores / total
2163
2164 # --Smoothing Update---
2165 from datetime import datetime, timezone
2166
2167 current_datetime = datetime.now(timezone.utc)
2168 target_datetime = datetime(2025, 1, 24, 12, 0, 0, tzinfo=timezone.utc)
2169
2170 if current_datetime < target_datetime:
2171 logger.info("Using early incentive scaling")
2172 if tier == "research":
2173 scale = 0.9
2174 elif tier == "universal":
2175 scale = 0.1
2176 else:
2177 logger.info("Using stable incentive scaling")
2178 scale = constants.TIER_CONFIG[tier].incentive_percentage
2179
2180 return scores * scale
2181
2182
2183
2184---
2185File: /neural_condense_core/validator_utils/monetize/__init__.py
2186---
2187
2188from .organic_gate import OrganicGate
2189
2190__all__ = [
2191 "OrganicGate",
2192]
2193
2194
2195
2196---
2197File: /neural_condense_core/validator_utils/monetize/organic_gate.py
2198---
2199
2200from fastapi import FastAPI, Depends, Request
2201from fastapi.exceptions import HTTPException
2202import pydantic
2203import asyncio
2204import bittensor as bt
2205import uvicorn
2206from concurrent.futures import ThreadPoolExecutor
2207import random
2208import httpx
2209import time
2210from ...constants import constants
2211from ...protocol import TextCompressProtocol
2212from ..managing import MinerManager
2213from ...logger import logger
2214
2215
2216class OrganicPayload(pydantic.BaseModel):
2217 context: str
2218 tier: str
2219 target_model: str
2220 miner_uid: int = -1
2221 top_incentive: float = 0.9
2222
2223
2224class OrganicResponse(pydantic.BaseModel):
2225 compressed_kv_url: str
2226 miner_uid: int = -1
2227 compressed_context: str
2228
2229
2230class RegisterPayload(pydantic.BaseModel):
2231 port: int
2232
2233
2234class OrganicGate:
2235 def __init__(
2236 self,
2237 miner_manager: MinerManager,
2238 config: bt.config,
2239 ):
2240 self.metagraph: bt.metagraph.__class__ = miner_manager.metagraph
2241 self.miner_manager = miner_manager
2242 self.wallet = miner_manager.wallet
2243 self.config = config
2244 self.dendrite = bt.dendrite(wallet=miner_manager.wallet)
2245 self.app = FastAPI()
2246 self.app.add_api_route(
2247 "/forward",
2248 self.forward,
2249 methods=["POST"],
2250 dependencies=[Depends(self.get_self)],
2251 )
2252 self.app.add_api_route(
2253 "/health",
2254 self.health_check,
2255 methods=["GET"],
2256 dependencies=[Depends(self.get_self)],
2257 )
2258 self.client_axon: bt.AxonInfo = None
2259 self.authentication_key = "".join(random.choices("0123456789abcdef", k=16))
2260
2261 async def _run_function_periodically(self, function, interval):
2262 while True:
2263 logger.info(
2264 f"Running function {function.__name__} every {interval} seconds."
2265 )
2266 try:
2267 await function()
2268 except Exception as e:
2269 logger.error(f"Error running function {function.__name__}: {e}")
2270 await asyncio.sleep(interval)
2271
2272 async def register_to_client(self):
2273 logger.info("Registering to client.")
2274 payload = RegisterPayload(port=self.config.validator.gate_port)
2275 logger.info(f"Payload: {payload}")
2276 try:
2277 response = await self.call(payload, timeout=12)
2278 logger.info(f"Registration response: {response}")
2279 except Exception as e:
2280 logger.error(f"Error during registration: {e}")
2281
2282 async def _authenticate(self, request: Request):
2283 message = request.headers["message"]
2284 if message != self.authentication_key:
2285 raise Exception("Authentication failed.")
2286
2287 async def forward(self, request: Request):
2288 try:
2289 await self._authenticate(request)
2290 logger.info("Forwarding organic request.")
2291 request: OrganicPayload = OrganicPayload(**await request.json())
2292 synapse = TextCompressProtocol(
2293 context=request.context,
2294 target_model=request.target_model,
2295 )
2296 logger.info(f"Context: {request.context[:100]}...")
2297 logger.info(f"Tier: {request.tier}")
2298 logger.info(f"Target model: {request.target_model}")
2299 targeted_uid = None
2300 if request.miner_uid != -1:
2301 counter = self.miner_manager.serving_counter[request.tier][
2302 request.miner_uid
2303 ]
2304 if counter.increment():
2305 targeted_uid = request.miner_uid
2306 else:
2307 logger.warning(f"Miner {request.miner_uid} is under rate limit.")
2308 return HTTPException(
2309 status_code=503,
2310 detail="Miner is under rate limit.",
2311 )
2312 else:
2313 metadata = self.miner_manager.get_metadata()
2314 tier_miners = [
2315 (uid, metadata.score)
2316 for uid, metadata in metadata.items()
2317 if metadata.tier == request.tier
2318 ]
2319 tier_miners.sort(key=lambda x: x[1], reverse=True)
2320
2321 # Try top miners until we find one under rate limit
2322 top_k = max(1, int(len(tier_miners) * request.top_incentive))
2323 top_miners = tier_miners[:top_k]
2324 random.shuffle(top_miners) # Randomize among top performers
2325 logger.info(f"Top {top_k} miners: {top_miners}")
2326
2327 for uid, _ in top_miners:
2328 if uid in self.miner_manager.serving_counter[request.tier]:
2329 counter = self.miner_manager.serving_counter[request.tier][uid]
2330 if counter.increment():
2331 targeted_uid = uid
2332 break
2333
2334 if targeted_uid is None:
2335 raise HTTPException(
2336 status_code=503,
2337 detail="No miners available.",
2338 )
2339 target_axon = self.metagraph.axons[targeted_uid]
2340 response: TextCompressProtocol = await self.dendrite.forward(
2341 axons=target_axon,
2342 synapse=synapse,
2343 timeout=constants.TIER_CONFIG[request.tier].timeout,
2344 deserialize=False,
2345 )
2346 # asyncio.create_task(self._organic_validating(response, request.tier))
2347 logger.info(
2348 f"Compressed to url: {response.compressed_kv_url}. Process time: {response.dendrite.process_time}"
2349 )
2350 except Exception as e:
2351 logger.error(f"Error: {e}")
2352 raise HTTPException(status_code=503, detail="Validator error.")
2353
2354 return OrganicResponse(
2355 compressed_kv_url=response.compressed_kv_url, miner_uid=targeted_uid, compressed_context=response.compressed_context
2356 )
2357
2358 async def _organic_validating(self, response, tier):
2359 if random.random() < constants.ORGANIC_VERIFY_FREQUENCY:
2360 is_valid, reason = await TextCompressProtocol.verify(
2361 response, constants.TIER_CONFIG[tier]
2362 )
2363
2364 if not is_valid:
2365 logger.warning(f"Invalid response: {reason}")
2366
2367 # TODO: Update miner's score
2368
2369 def start_server(self):
2370 self.executor = ThreadPoolExecutor(max_workers=1)
2371
2372 async def startup():
2373 config = uvicorn.Config(
2374 self.app,
2375 host="0.0.0.0",
2376 port=self.config.validator.gate_port,
2377 loop="asyncio",
2378 )
2379 server = uvicorn.Server(config)
2380 await server.serve()
2381
2382 async def run_all():
2383 registration_task = self._run_function_periodically(
2384 self.register_to_client, 60
2385 )
2386 server_task = startup()
2387 await asyncio.gather(registration_task, server_task)
2388
2389 asyncio.run(run_all())
2390
2391 async def get_self(self):
2392 return self
2393
2394 async def health_check(self):
2395 return {"status": "healthy"}
2396
2397 async def call(
2398 self,
2399 payload: RegisterPayload,
2400 timeout: float = 12.0,
2401 ) -> bt.Synapse:
2402 """
2403 Customized call method to send Synapse-like requests to the Organic Client Server.
2404
2405 Args:
2406 dendrite (bt.Dendrite): The Dendrite object to send the request.
2407 url (str): The URL of the Organic Client Server.
2408 payload (pydantic.BaseModel): The payload to send in the request.
2409 timeout (float, optional): Maximum duration to wait for a response from the Axon in seconds. Defaults to ``12.0``.
2410
2411 Returns:
2412
2413 """
2414
2415 url = f"{self.config.validator.organic_client_url}/register"
2416 nonce = str(time.time_ns())
2417 message = self.authentication_key + ":" + nonce
2418 signature = f"0x{self.dendrite.keypair.sign(message).hex()}"
2419
2420 headers = {
2421 "Content-Type": "application/json",
2422 "message": message,
2423 "ss58_address": self.wallet.hotkey.ss58_address,
2424 "signature": signature,
2425 }
2426
2427 async with httpx.AsyncClient() as client:
2428 response = await client.post(
2429 url,
2430 json=payload.model_dump(),
2431 headers=headers,
2432 timeout=timeout,
2433 )
2434
2435 if response.status_code != 200:
2436 logger.error(
2437 f"Failed to register to the Organic Client Server. Response: {response.text}"
2438 )
2439 return
2440 else:
2441 logger.info("Registered to the Organic Client Server.")
2442 return response.json()
2443
2444
2445
2446---
2447File: /neural_condense_core/validator_utils/synthesizing/custom_dataset_loaders/__init__.py
2448---
2449
2450from typing import List
2451from .context_loader import load_wikipedia_science_dataset
2452from .infinity_iterable_dataset import InfiniteDataset
2453
2454
2455def load_context_datasets() -> List[InfiniteDataset]:
2456 return [
2457 InfiniteDataset(load_wikipedia_science_dataset().shuffle(seed=42)),
2458 ]
2459
2460
2461
2462---
2463File: /neural_condense_core/validator_utils/synthesizing/custom_dataset_loaders/context_loader.py
2464---
2465
2466from datasets import load_dataset
2467
2468
2469def load_wikipedia_science_dataset():
2470 ds = load_dataset(
2471 "Laz4rz/wikipedia_science_chunked_small_rag_512", streaming=True, split="train"
2472 )
2473 ds = ds.shuffle()
2474 ds = ds.filter(lambda x: len(x["text"]) > 512)
2475 ds = ds.map(lambda x: {"context": x["text"]})
2476 print("Loaded wikipedia science dataset")
2477 return ds
2478
2479
2480
2481---
2482File: /neural_condense_core/validator_utils/synthesizing/custom_dataset_loaders/infinity_iterable_dataset.py
2483---
2484
2485from typing import Iterator
2486from datasets import IterableDataset
2487
2488
2489class InfiniteDataset:
2490 def __init__(self, dataset: IterableDataset):
2491 """
2492 Initialize the InfiniteDataset wrapper.
2493
2494 Args:
2495 dataset (IterableDataset): The dataset object to wrap. It should be an iterable dataset.
2496 """
2497 self.dataset = dataset
2498 self.iterator = iter(self.dataset) # Initialize the iterator
2499
2500 def __iter__(self) -> Iterator:
2501 """
2502 Return the iterator for the dataset.
2503
2504 Returns:
2505 Iterator: An iterator over the dataset.
2506 """
2507 return self
2508
2509 def __next__(self):
2510 """
2511 Get the next item in the dataset. Automatically reinitialize the iterator if the end is reached.
2512
2513 Returns:
2514 The next item in the dataset.
2515 """
2516 try:
2517 return next(self.iterator)
2518 except StopIteration:
2519 # Reinitialize iterator if the end is reached
2520 self.iterator = iter(self.dataset)
2521 return next(self.iterator) # Return the first item of the new iterator
2522
2523
2524
2525---
2526File: /neural_condense_core/validator_utils/synthesizing/custom_dataset_loaders/instruction_loader.py
2527---
2528
2529from datasets import load_dataset, IterableDataset
2530
2531
2532def load_orca_instruct_dataset() -> IterableDataset:
2533 ds = load_dataset(
2534 "mlabonne/orca-agentinstruct-1M-v1-cleaned", split="train", streaming=True
2535 )
2536 ds = ds.map(
2537 lambda x: {
2538 "messages": x["messages"],
2539 }
2540 )
2541 return ds
2542
2543
2544def load_open_math_instruct_dataset() -> IterableDataset:
2545 ds = load_dataset("nvidia/OpenMathInstruct-2", split="train", streaming=True)
2546 ds = ds.map(
2547 lambda x: {
2548 "messages": [
2549 {"role": "user", "content": x["problem"]},
2550 {"role": "assistant", "content": x["generated_solution"]},
2551 ]
2552 }
2553 )
2554 return ds
2555
2556
2557
2558---
2559File: /neural_condense_core/validator_utils/synthesizing/__init__.py
2560---
2561
2562from .challenge_generator import ChallengeGenerator
2563
2564__all__ = [
2565 "ChallengeGenerator",
2566]
2567
2568
2569
2570---
2571File: /neural_condense_core/validator_utils/synthesizing/challenge_generator.py
2572---
2573
2574from transformers import AutoTokenizer
2575import re
2576import substrateinterface as st
2577from .scheduler import Scheduler
2578from .convo_generator import ConvoGenerator
2579from .schemas import QASchedulerConfig
2580import random
2581import os
2582from typing import Tuple, List
2583from ...protocol import TextCompressProtocol
2584from .filter_chunker import FilterExistanceChecker
2585from ...constants import constants, ChatTemplate
2586from .utils import retry
2587
2588CORCEL_API_KEY = os.getenv("CORCEL_API_KEY")
2589CORCEL_BASE_URL = os.getenv(
2590 "CORCEL_BASE_URL", "https://api.corcel.io/v1/text/vision/chat"
2591)
2592GENERATOR_MODEL_ID = os.getenv("GENERATOR_MODEL_ID", "llama-3-1-8b")
2593
2594
2595class ChallengeGenerator:
2596 def __init__(self, keypair: st.Keypair):
2597 """
2598 Initialize the ChallengeGenerator class with various dataset loaders and configuration tokens.
2599 """
2600 self.generator = ConvoGenerator(keypair=keypair)
2601 self.synthesizer = Scheduler(
2602 generator=self.generator,
2603 qa_config=QASchedulerConfig(n_qa_per_context=4, max_items=100),
2604 refresh_time=60.0,
2605 )
2606 self.synthesizer.start()
2607 self.start_activation_token = "<START-ACTIVATE-TOKEN>"
2608 self.end_activation_token = "<END-ACTIVATE-TOKEN>"
2609 self.task_to_builder = {
2610 "question_answering": self._build_qa_conversation,
2611 }
2612 self.filter_checker = FilterExistanceChecker(chunk_size=256)
2613
2614 @retry(max_attempts=3)
2615 async def generate_challenge(
2616 self,
2617 model_name: str,
2618 tier: str,
2619 task: str = "question_answering",
2620 max_context_length_in_chars: int = 10000,
2621 ) -> TextCompressProtocol:
2622 if tier == "universal":
2623 chat_template = None
2624 else:
2625 chat_template = constants.CHAT_TEMPLATES[model_name.split("/")[-1]]
2626 try:
2627 context, challenge_question, challenge_answer = await self.task_to_builder[
2628 task
2629 ](max_context_length_in_chars)
2630 positive_chunks, negative_chunks = self.filter_checker.get_chunks(context)
2631 synapse = self._build_protocol(
2632 chat_template,
2633 context,
2634 challenge_question,
2635 challenge_answer,
2636 positive_chunks,
2637 negative_chunks,
2638 )
2639 except Exception as e:
2640 raise e
2641 return synapse
2642
2643 @retry(max_attempts=3)
2644 async def _build_qa_conversation(self, max_chars: int) -> Tuple[str, str, str]:
2645 context_qa_items = await self.synthesizer.get_qas(n=50)
2646 context = ""
2647 question_answer_pairs = []
2648 for qa_item in context_qa_items:
2649 if len(context) + len(qa_item.context_seed) > max_chars:
2650 continue
2651 context += f"\n{qa_item.context_seed}"
2652 questions = qa_item.questions
2653 answers = qa_item.answers
2654 question_answer_pairs.extend(list(zip(questions, answers)))
2655 random.shuffle(question_answer_pairs)
2656 challenge_qa_pairs = random.sample(question_answer_pairs, 5)
2657 challenge_questions = [qa_pair[0] for qa_pair in challenge_qa_pairs]
2658 challenge_answers = [qa_pair[1] for qa_pair in challenge_qa_pairs]
2659
2660 return context, challenge_questions, challenge_answers
2661
2662 def _build_protocol(
2663 self,
2664 chat_template: ChatTemplate,
2665 context: str,
2666 challenge_questions: List[str],
2667 challenge_answers: List[str],
2668 positive_chunks: List[str],
2669 negative_chunks: List[str],
2670 ) -> TextCompressProtocol:
2671 if chat_template is None:
2672 formatted_context = context
2673 formatted_questions = challenge_questions
2674 else:
2675 formatted_context = chat_template.apply_context_template(context)
2676 formatted_questions = [
2677 chat_template.apply_question_template(question)
2678 for question in challenge_questions
2679 ]
2680
2681 return TextCompressProtocol.model_validate(
2682 {
2683 "task_data": {
2684 "original_context": context,
2685 "challenge_questions": challenge_questions,
2686 "challenge_answers": challenge_answers,
2687 "formatted_questions": formatted_questions,
2688 "positive_chunks": positive_chunks,
2689 "formatted_context": formatted_context,
2690 "negative_chunks": negative_chunks,
2691 },
2692 "context": formatted_context,
2693 }
2694 )
2695
2696
2697
2698---
2699File: /neural_condense_core/validator_utils/synthesizing/convo_generator.py
2700---
2701
2702from typing import Dict, List, Optional
2703import httpx
2704import substrateinterface as st
2705import time
2706import random
2707
2708
2709class ConvoGenerator:
2710 def __init__(
2711 self,
2712 keypair: st.Keypair,
2713 ):
2714 self.model_id = "chat-llama-3-1-8b"
2715 self.model_ids = [
2716 "chat-llama-3-1-8b",
2717 "chat-llama-3-1-70b",
2718 ]
2719 self.url = "https://api.nineteen.ai/v1/chat/completions"
2720 self.keypair = keypair
2721 self.client = httpx.AsyncClient()
2722
2723 def _get_headers(self):
2724 nonce = str(time.time_ns())
2725 signature = f"0x{self.keypair.sign(nonce).hex()}"
2726 return {
2727 "validator-hotkey": self.keypair.ss58_address,
2728 "signature": signature,
2729 "nonce": nonce,
2730 "netuid": "47",
2731 "Content-Type": "application/json",
2732 }
2733
2734 def _get_assistant_messages(self, messages, n_few_shots):
2735 a_messages = messages[n_few_shots:] # Skip few shots
2736 for i in range(len(a_messages)):
2737 if a_messages[i]["role"] == "assistant":
2738 a_messages[i]["role"] = "user"
2739 else:
2740 a_messages[i]["role"] = "assistant"
2741 return a_messages
2742
2743 async def _make_api_call(self, messages, sampling_params):
2744 payload = sampling_params | {
2745 "model": random.choice(self.model_ids),
2746 "messages": messages,
2747 }
2748 response = await self.client.post(
2749 self.url, json=payload, headers=self._get_headers(), timeout=32
2750 )
2751 if response.status_code != 200:
2752 raise Exception(f"Nineteen API Error: {response.text}")
2753 data = response.json()
2754 content = data["choices"][0]["message"]["content"]
2755 return content
2756
2757 async def generate_conversation(
2758 self,
2759 messages_seed: Optional[List[Dict[str, str]]] = None,
2760 max_turns: int = 4,
2761 ):
2762 assert (
2763 messages_seed[0]["role"] == "user"
2764 and messages_seed[-1]["role"] == "assistant"
2765 ), "First and last message must be a user and assistant message respectively"
2766 assert (
2767 len(messages_seed) % 2 == 0
2768 ), "messages_seed must have an even number of messages"
2769
2770 reversed_messages_seed = []
2771 role_switcher = {"user": "assistant", "assistant": "user"}
2772 for message in messages_seed:
2773 content = message["content"]
2774 role = message["role"]
2775 reversed_messages_seed.append(
2776 {"role": role_switcher[role], "content": content}
2777 )
2778 assert max_turns % 2 == 0, "max_turns must be even"
2779 messages = [
2780 {
2781 "role": "user",
2782 "content": (
2783 "Your task is to act as a human and questioning on me. "
2784 "You can ask me anything, I will give you the answer."
2785 "You have to talk like a human, concisely and dont show emotion."
2786 ),
2787 },
2788 {
2789 "role": "assistant",
2790 "content": "Sure, we will start with a simple question: 1+1=?",
2791 },
2792 {
2793 "role": "user",
2794 "content": "2",
2795 },
2796 ]
2797 n_few_shots = len(messages)
2798 messages.extend(reversed_messages_seed)
2799 sampling_params = {"temperature": 0.4, "max_tokens": 1024, "stream": False}
2800 # Get first response
2801 text = await self._make_api_call(messages, sampling_params)
2802 messages.append({"role": "assistant", "content": text})
2803
2804 # Generate multiple conversation turns
2805 assistant_messages = self._get_assistant_messages(messages, n_few_shots)
2806 for i in range(max_turns):
2807 # CALL ASSISTANT-MESSAGES -> ASSISTANT-MESSAGES
2808 text = await self._make_api_call(assistant_messages, sampling_params)
2809 assistant_messages.append({"role": "assistant", "content": text})
2810 messages.append({"role": "user", "content": text})
2811 if i == max_turns - 1:
2812 break
2813 # CALL MESSAGES -> FAKE--MESSAGES
2814 text = await self._make_api_call(messages, sampling_params)
2815 assistant_messages.append({"role": "user", "content": text})
2816 messages.append({"role": "assistant", "content": text})
2817
2818 total_chars = 0
2819 for i in range(len(assistant_messages)):
2820 total_chars += len(assistant_messages[i]["content"])
2821 return assistant_messages, total_chars
2822
2823 async def generate_qa_pairs(self, context_seed: str, num_questions: int = 1):
2824 prompt = f"""
2825You are an agent that generates questions from provided text.
2826Instructions:
2827- For provided text, generate {num_questions} questions that can be answered solely by the facts in the text.
2828- Return questions in a list format. Example: ["Question 1", "Question 2", "Question 3"]
2829
2830### Context ###
2831```
2832{context_seed}
2833```
2834"""
2835 messages = [
2836 {"role": "user", "content": prompt},
2837 ]
2838 sampling_params = {
2839 "temperature": 1.0,
2840 "stop": ["?"],
2841 "max_tokens": 512,
2842 "stream": False,
2843 }
2844 text = await self._make_api_call(messages, sampling_params)
2845 questions = self.extract_questions(text)
2846 if not questions:
2847 print(text)
2848 answers = []
2849 for question in questions:
2850 sampling_params = {"temperature": 0.4, "max_tokens": 1024, "stream": False}
2851 text = await self._make_api_call(
2852 [
2853 {
2854 "role": "user",
2855 "content": f"{context_seed}\n\nBased on above context, answer the question concisely: {question}",
2856 }
2857 ],
2858 sampling_params,
2859 )
2860 answers.append(text)
2861 total_chars = len(context_seed)
2862 for q, a in zip(questions, answers):
2863 total_chars += len(q) + len(a)
2864 return questions, answers, total_chars
2865
2866 def extract_questions(self, completion: str):
2867 # Extract based on format ["Question 1", "Question 2", "Question 3"]
2868 start_list = completion.find("[")
2869 end_list = completion.find("]")
2870 questions = completion[start_list + 1 : end_list]
2871 questions = eval(questions)
2872 return questions
2873
2874
2875
2876---
2877File: /neural_condense_core/validator_utils/synthesizing/filter_chunker.py
2878---
2879
2880from datasets import load_dataset
2881from typing import Tuple
2882import random
2883from semantic_text_splitter import TextSplitter
2884
2885
2886class FilterExistanceChecker:
2887 def __init__(self, chunk_size: int = 256):
2888 self.splitter = TextSplitter(chunk_size)
2889 self.negative_dataset = self._load_negative_dataset()
2890
2891 def _load_negative_dataset(self):
2892 negative_dataset = load_dataset(
2893 "TIGER-Lab/Fineweb-Instruct", streaming=True, split="train"
2894 )
2895 negative_dataset = negative_dataset.shuffle()
2896 negative_dataset = negative_dataset.filter(lambda x: len(x["response"]) > 1024)
2897 negative_dataset = negative_dataset.map(lambda x: {"text": x["response"]})
2898 negative_dataset = iter(negative_dataset)
2899 return negative_dataset
2900
2901 def _get_negative_message(self):
2902 try:
2903 return next(self.negative_dataset)["text"]
2904 except StopIteration:
2905 self.negative_dataset = self._load_negative_dataset()
2906 return self._get_negative_message()
2907
2908 def get_chunks(self, context: str) -> Tuple[str, str]:
2909 # Test on positive case (text from conversation)
2910 chunks = self.splitter.chunks(context)
2911 # Get random 2 chunks
2912 positive_chunks = random.sample(chunks, 2)
2913 # Test on negative case (text not from conversation)
2914 negative_chunks = random.sample(
2915 self.splitter.chunks(self._get_negative_message()), 2
2916 )
2917 return positive_chunks, negative_chunks
2918
2919
2920
2921---
2922File: /neural_condense_core/validator_utils/synthesizing/scheduler.py
2923---
2924
2925import redis
2926from typing import Optional, List
2927import random
2928from datasets import load_dataset
2929from .convo_generator import ConvoGenerator
2930from .custom_dataset_loaders import load_context_datasets
2931from .schemas import (
2932 QASet,
2933 QASchedulerConfig,
2934)
2935from ...constants import constants
2936import time
2937from ...logger import logger
2938import asyncio
2939
2940
2941class Scheduler:
2942 def __init__(
2943 self,
2944 generator: ConvoGenerator,
2945 qa_config: QASchedulerConfig,
2946 refresh_time: float = 10.0,
2947 ):
2948 self.generator = generator
2949 self.qa_config = qa_config
2950 self.refresh_time = refresh_time
2951 self.context_datasets = load_context_datasets()
2952 redis_config = constants.DATABASE_CONFIG.redis
2953 self.redis = redis.Redis(
2954 host=redis_config.host,
2955 port=redis_config.port,
2956 db=redis_config.db,
2957 decode_responses=True,
2958 )
2959 self.qa_key = "qa_sets"
2960 self._prefill_queues()
2961 self.running = False
2962 self.loop = asyncio.get_event_loop()
2963
2964 def _prefill_queues(self):
2965 self.redis.delete(self.qa_key)
2966 cached_qa_ds = load_dataset(
2967 "Condense-AI/subnet-synthetic-dataset-v0.2", name="QA", split="train"
2968 )
2969 for qa_set in cached_qa_ds:
2970 item = QASet(**qa_set)
2971 self.redis.sadd(self.qa_key, item.model_dump_json())
2972
2973 logger.info(f"βœ… Prefilled QA: {self.redis.scard(self.qa_key)} items.")
2974
2975 def _get_next_context_seed(self):
2976 ds = random.choice(self.context_datasets)
2977 item = next(ds)
2978 return item["context"]
2979
2980 async def _refresh_qa_queue(self):
2981 while self.running:
2982 if self.redis.scard(self.qa_key) < self.qa_config.max_items:
2983 try:
2984 context_seed = self._get_next_context_seed()
2985 (
2986 questions,
2987 answers,
2988 total_chars,
2989 ) = await self.generator.generate_qa_pairs(
2990 context_seed, num_questions=self.qa_config.n_qa_per_context
2991 )
2992 qa_set = QASet(
2993 questions=questions,
2994 answers=answers,
2995 total_chars=total_chars,
2996 context_seed=context_seed,
2997 )
2998 self.redis.sadd(self.qa_key, qa_set.model_dump_json())
2999 current_time = time.strftime("%H:%M:%S")
3000 logger.info(
3001 f"βœ… QA Set: {self.redis.scard(self.qa_key)} - last_time: {current_time} - {total_chars} chars"
3002 )
3003 except Exception as e:
3004 logger.warning(f"❌ Error generating QA set: {e}")
3005 else:
3006 self.redis.spop(self.qa_key)
3007 await asyncio.sleep(self.refresh_time)
3008
3009 def start(self):
3010 self.running = True
3011 self.loop.create_task(self._refresh_qa_queue())
3012
3013 def stop(self):
3014 self.running = False
3015
3016 async def get_qas(self, n: int = 1) -> Optional[List[QASet]]:
3017 items = self.redis.srandmember(self.qa_key, n)
3018 return [QASet.model_validate_json(item) for item in items] if items else None
3019
3020
3021
3022---
3023File: /neural_condense_core/validator_utils/synthesizing/schemas.py
3024---
3025
3026from pydantic import BaseModel, Field
3027from typing import List
3028from pydantic import validator
3029
3030
3031class QASchedulerConfig(BaseModel):
3032 n_qa_per_context: int = Field(
3033 ..., description="Number of QA pairs to generate per context"
3034 )
3035 max_items: int = Field(..., description="Maximum number of items to generate")
3036
3037
3038class ConversationSchedulerConfig(BaseModel):
3039 n_new_conversations: int = Field(
3040 ..., description="Number of new conversations to generate"
3041 )
3042 n_previous_conversations: int = Field(
3043 ..., description="Number of previous conversations to keep from the dataset"
3044 )
3045 max_items: int = Field(..., description="Maximum number of items to generate")
3046
3047
3048class Message(BaseModel):
3049 role: str = Field(..., description="Role of the message sender (user or assistant)")
3050 content: str = Field(..., description="Content of the message")
3051
3052
3053class QASet(BaseModel):
3054 questions: List[str] = Field(..., description="List of generated questions")
3055 answers: List[str] = Field(..., description="List of corresponding answers")
3056 total_chars: int = Field(
3057 ..., description="Total character count of questions and answers"
3058 )
3059 context_seed: str = Field(
3060 ..., description="Original context used to generate QA pairs"
3061 )
3062
3063 @validator("questions", "answers")
3064 def validate_questions_and_answers(cls, v):
3065 if not v:
3066 raise ValueError("Questions and answers must be non-empty lists")
3067 return v
3068
3069
3070class Conversation(BaseModel):
3071 messages: List[Message] = Field(..., description="List of conversation messages")
3072 total_chars: int = Field(
3073 ..., description="Total character count of the conversation"
3074 )
3075 messages_seed: List[Message] = Field(
3076 ..., description="Original messages used to generate conversation"
3077 )
3078
3079 @validator("messages")
3080 def validate_messages(cls, v):
3081 if not v:
3082 raise ValueError("Messages must be non-empty lists")
3083 return v
3084
3085
3086
3087---
3088File: /neural_condense_core/validator_utils/synthesizing/utils.py
3089---
3090
3091# Define retry decorator
3092def retry(max_attempts=3):
3093 def decorator(func):
3094 def wrapper(*args, **kwargs):
3095 attempts = 0
3096 while attempts < max_attempts:
3097 try:
3098 return func(*args, **kwargs)
3099 except Exception as e:
3100 attempts += 1
3101 if attempts == max_attempts:
3102 raise e
3103 return None
3104
3105 return wrapper
3106
3107 return decorator
3108
3109
3110
3111---
3112File: /neural_condense_core/validator_utils/__init__.py
3113---
3114
3115from . import loop
3116from . import synthesizing
3117from . import managing
3118from . import monetize
3119
3120__all__ = [
3121 "loop",
3122 "synthesizing",
3123 "managing",
3124 "monetize",
3125]
3126
3127
3128
3129---
3130File: /neural_condense_core/__init__.py
3131---
3132
3133from . import base
3134from . import validator_utils
3135from . import miner_utils
3136from . import protocol
3137from . import common
3138from .constants import constants
3139from .logger import logger
3140
3141__version__ = "0.0.3"
3142version_split = __version__.split(".")
3143__spec_version__ = (
3144 (1000 * int(version_split[0]))
3145 + (10 * int(version_split[1]))
3146 + (1 * int(version_split[2]))
3147)
3148
3149__all__ = [
3150 "base",
3151 "validator_utils",
3152 "miner_utils",
3153 "protocol",
3154 "common",
3155 "constants",
3156 "logger",
3157]
3158
3159
3160
3161---
3162File: /neural_condense_core/constants.py
3163---
3164
3165from pydantic import BaseModel, Field
3166from typing import List, Dict
3167import os
3168
3169
3170class ChatTemplate(BaseModel):
3171 # Example of Mistral-7B-Instruct-v0.2
3172 bos_token: str = "<s>"
3173 eos_token: str = "</s>"
3174 user_start_token: str = "[INST]"
3175 user_end_token: str = "[/INST]"
3176 assistant_start_token: str = "" # No specific start token for the assistant
3177 assistant_end_token: str = "" # No specific end token for the assistant
3178
3179 def apply_context_template(self, context: str):
3180 return f"{self.bos_token} {self.user_start_token} {context}"
3181
3182 def apply_question_template(self, question: str):
3183 return f"\n\n{question} {self.user_end_token} {self.assistant_start_token}"
3184
3185
3186class TierConfig(BaseModel):
3187 incentive_percentage: float
3188 requests_per_epoch: int
3189 timeout: int
3190 supporting_models: List[str]
3191 max_condensed_tokens: int
3192 min_condensed_tokens: int
3193 max_compress_rate: float = 1.0
3194 min_compress_rate: float = 0.1
3195 max_context_length_in_chars: int
3196 accelerate_reward_scalar: float
3197
3198
3199class SyntheticTaskConfig(BaseModel):
3200 task: str
3201 criterias: List[str]
3202 rewarding_frequency: int
3203 weight: float
3204
3205
3206class RedisConfig(BaseModel):
3207 """Configuration for Redis connection"""
3208
3209 host: str = Field(default="localhost")
3210 port: int = Field(default=6379)
3211 db: int = Field(default=0)
3212 expire_time: int = Field(
3213 default=3600, description="Default expiration time in seconds"
3214 )
3215 serving_counter_key_format: str = Field(default="serving_counter:{tier}:{uid}")
3216
3217
3218class SqlConfig(BaseModel):
3219 """Configuration for SQL database connection"""
3220
3221 url: str = Field(
3222 default="sqlite:///miner_metadata.db",
3223 description="Database URL in SQLAlchemy format",
3224 )
3225
3226
3227class DatabaseConfig(BaseModel):
3228 """Combined database configuration"""
3229
3230 redis: RedisConfig = Field(default_factory=RedisConfig)
3231 sql: SqlConfig = Field(default_factory=SqlConfig)
3232
3233
3234class Constants(BaseModel):
3235 TIER_CONFIG: dict[str, TierConfig] = {
3236 "research": TierConfig(
3237 incentive_percentage=0.6,
3238 requests_per_epoch=256,
3239 timeout=32,
3240 accelerate_reward_scalar=0.1,
3241 supporting_models=["Condense-AI/Mistral-7B-Instruct-v0.2"],
3242 max_condensed_tokens=1536,
3243 min_condensed_tokens=128,
3244 max_context_length_in_chars=15000,
3245 ),
3246 "universal": TierConfig(
3247 incentive_percentage=0.4,
3248 requests_per_epoch=256,
3249 timeout=16,
3250 accelerate_reward_scalar=0.1,
3251 supporting_models=["unsloth/Meta-Llama-3.1-8B-Instruct"],
3252 max_condensed_tokens=-1,
3253 min_condensed_tokens=-1,
3254 max_context_length_in_chars=15000,
3255 max_compress_rate=0.8,
3256 min_compress_rate=0.1,
3257 ),
3258 }
3259
3260 SYNTHETIC_TASK_CONFIG: List[SyntheticTaskConfig] = [
3261 SyntheticTaskConfig(
3262 task="causal_conversation",
3263 criterias=["perplexity"],
3264 rewarding_frequency=1,
3265 weight=0,
3266 ),
3267 SyntheticTaskConfig(
3268 task="question_answering",
3269 criterias=["accuracy"],
3270 rewarding_frequency=1,
3271 weight=1,
3272 ),
3273 SyntheticTaskConfig(
3274 task="reconstruct_conversation",
3275 criterias=["perplexity"],
3276 rewarding_frequency=1,
3277 weight=0,
3278 ),
3279 SyntheticTaskConfig(
3280 task="trivial_qa_conversation",
3281 criterias=["accuracy"],
3282 rewarding_frequency=1,
3283 weight=0,
3284 ),
3285 ]
3286
3287 # Default values
3288 EPOCH_LENGTH: int = 600
3289 SCORING_PER_MINER_PER_EPOCH: int = 1
3290 SUBNET_TEMPO: int = 360
3291 MIN_STAKE: int = int(os.environ.get("MIN_STAKE", 10000))
3292 RPE_PERCENTAGE_FOR_SYNTHETIC: float = 0.1
3293 BATCH_SIZE: int = 8
3294 SET_WEIGHTS_TIMEOUT: int = 120
3295 ORGANIC_CLIENT_URL: str = "https://ncs-client.condenses.ai"
3296 REPORT_URL: str = "https://report.condenses.ai"
3297 ORGANIC_VERIFY_FREQUENCY: float = 0.1
3298 TOP_PERCENTAGE_FOR_ALLOCATING_WEIGHTS: float = 1.0
3299
3300 DATABASE_CONFIG: DatabaseConfig = Field(
3301 default_factory=lambda: DatabaseConfig(
3302 redis=RedisConfig(
3303 host=os.getenv("REDIS_HOST", "localhost"),
3304 port=int(os.getenv("REDIS_PORT", 6379)),
3305 db=int(os.getenv("REDIS_DB", 0)),
3306 expire_time=int(os.getenv("REDIS_EXPIRE_TIME", 3600)),
3307 ),
3308 sql=SqlConfig(
3309 url=os.getenv("SQL_DATABASE_URL", "sqlite:///miner_metadata.db")
3310 ),
3311 )
3312 )
3313
3314 CHAT_TEMPLATES: Dict[str, ChatTemplate] = {
3315 "Mistral-7B-Instruct-v0.2": ChatTemplate(
3316 bos_token="<s>",
3317 eos_token="</s>",
3318 user_start_token="[INST]",
3319 user_end_token="[/INST]",
3320 assistant_start_token="",
3321 assistant_end_token="",
3322 ),
3323 }
3324
3325 # Adjust values based on NETWORK environment variable
3326 def __init__(self, **data):
3327 super().__init__(**data)
3328 network = os.getenv("NETWORK")
3329 if network == "test":
3330 self.RPE_PERCENTAGE_FOR_SYNTHETIC = float(
3331 os.getenv("RPE_PERCENTAGE_FOR_SYNTHETIC", 0.5)
3332 )
3333 self.EPOCH_LENGTH = int(os.getenv("EPOCH_LENGTH", 600))
3334 self.MIN_STAKE = int(os.getenv("MIN_STAKE", 0))
3335 self.ORGANIC_CLIENT_URL = os.getenv(
3336 "ORGANIC_CLIENT_URL", "https://testnet-ncs-client.condenses.ai"
3337 )
3338 self.REPORT_URL = os.getenv(
3339 "REPORT_URL", "https://testnet-report.condenses.ai"
3340 )
3341
3342
3343constants = Constants()
3344
3345if __name__ == "__main__":
3346 import rich
3347
3348 for k, v in constants.model_dump().items():
3349 rich.print(f"- {k}: {v}")
3350
3351
3352
3353---
3354File: /neural_condense_core/executor.py
3355---
3356
3357from concurrent.futures import ThreadPoolExecutor
3358
3359THREAD_POOL_SIZE: int = 8
3360THREAD_POOL: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)
3361
3362
3363
3364---
3365File: /neural_condense_core/logger.py
3366---
3367
3368import structlog
3369
3370logger = structlog.get_logger()
3371
3372
3373
3374---
3375File: /neural_condense_core/protocol.py
3376---
3377
3378import re
3379from bittensor import Synapse
3380from typing import Any, List
3381import torch
3382from transformers import DynamicCache
3383from pydantic import BaseModel
3384from .common.file import load_npy_from_url
3385from .constants import TierConfig
3386import numpy as np
3387import io
3388from neural_condense_core.logger import logger
3389
3390
3391class Metadata(Synapse):
3392 metadata: dict = {}
3393
3394
3395class TaskData(BaseModel):
3396 formatted_context: str = ""
3397 original_context: str = ""
3398 challenge_questions: List[str] = []
3399 challenge_answers: List[str] = []
3400 formatted_questions: List[str] = []
3401 negative_chunks: List[str] = []
3402 positive_chunks: List[str] = []
3403
3404
3405class UtilData(BaseModel):
3406 compressed_kv_b64: str = ""
3407 compressed_length: int = 0
3408 download_time: float = 0.0
3409 bonus_compress_size: float = 0.0
3410 bonus_time: float = 0.0
3411 local_filename: str = ""
3412
3413
3414class MinerResponse(BaseModel):
3415 filename: str = ""
3416 compressed_context: str = ""
3417
3418
3419class BatchedScoringRequest(BaseModel):
3420 miner_responses: List[MinerResponse] = []
3421 task_data: TaskData = TaskData()
3422 target_model: str = ""
3423 criterias: List[str] = []
3424
3425
3426class TextCompressProtocol(Synapse):
3427 context: str = ""
3428 target_model: str = ""
3429 tier: str = ""
3430 compressed_kv_url: str = ""
3431 compressed_context: str = ""
3432 util_data: UtilData = UtilData()
3433 task_data: TaskData = TaskData()
3434
3435 @property
3436 def accelerate_score(self) -> float:
3437 return (self.util_data.bonus_compress_size + self.bonus_time) / 2
3438
3439 @property
3440 def bonus_time(self) -> float:
3441 if self.tier == "universal":
3442 return 1 - min(1, self.dendrite.process_time / self.timeout)
3443 else:
3444 return 1 - min(
3445 1,
3446 (self.dendrite.process_time + self.util_data.download_time)
3447 / self.timeout,
3448 )
3449
3450 @property
3451 def miner_payload(self) -> dict:
3452 return {"context": self.context, "target_model": self.target_model}
3453
3454 @property
3455 def miner_synapse(self, is_miner: bool = False):
3456 return TextCompressProtocol(
3457 **self.model_dump(include={"context", "target_model"})
3458 )
3459
3460 @property
3461 def validator_payload(self) -> dict:
3462 return {
3463 "task_data": self.task_data.model_dump(),
3464 "util_data": self.util_data.model_dump(),
3465 "tier": self.tier,
3466 }
3467
3468 @staticmethod
3469 def get_scoring_payload(
3470 responses: List["TextCompressProtocol"],
3471 ground_truth_synapse: "TextCompressProtocol",
3472 target_model: str,
3473 criterias: List[str],
3474 ) -> BatchedScoringRequest:
3475 if ground_truth_synapse.tier != "universal":
3476 return BatchedScoringRequest(
3477 miner_responses=[
3478 {"filename": r.util_data.local_filename} for r in responses
3479 ],
3480 task_data=ground_truth_synapse.task_data,
3481 target_model=target_model,
3482 criterias=criterias,
3483 )
3484 else:
3485 return BatchedScoringRequest(
3486 miner_responses=[
3487 {"compressed_context": r.compressed_context} for r in responses
3488 ],
3489 task_data=ground_truth_synapse.task_data,
3490 target_model=target_model,
3491 criterias=criterias,
3492 )
3493
3494 @staticmethod
3495 async def verify(
3496 response: "TextCompressProtocol",
3497 tier_config: TierConfig,
3498 tier: str,
3499 tokenizer=None,
3500 ground_truth_synapse: "TextCompressProtocol" = None,
3501 ) -> tuple[bool, str]:
3502 print(f"Verifying tier: {tier}")
3503 if tier == "universal":
3504 condensed_tokens = tokenizer.encode(response.compressed_context)
3505 original_tokens = tokenizer.encode(ground_truth_synapse.context)
3506 n_condensed_tokens = len(condensed_tokens)
3507 compress_rate = n_condensed_tokens / len(original_tokens)
3508 logger.info(f"Compress rate: {compress_rate}")
3509 if not (
3510 tier_config.min_compress_rate
3511 <= compress_rate
3512 <= tier_config.max_compress_rate
3513 ):
3514 return (
3515 False,
3516 f"Compressed tokens are not within the expected range. {compress_rate}. Valid range: {tier_config.min_compress_rate} to {tier_config.max_compress_rate}",
3517 )
3518
3519 response.util_data.bonus_compress_size = 1 - compress_rate
3520 response.util_data.compressed_length = n_condensed_tokens
3521 return True, ""
3522 else:
3523 if not re.match(r"^https?://.*\.npy$", response.compressed_kv_url):
3524 return False, "Compressed KV URL must use HTTP or HTTPS."
3525
3526 compressed_kv, filename, download_time, error = await load_npy_from_url(
3527 response.compressed_kv_url
3528 )
3529 response.util_data.download_time = download_time
3530 response.util_data.local_filename = filename
3531 if compressed_kv is None:
3532 return (
3533 False,
3534 f"Failed to load url: {error}. {download_time} seconds. {filename}",
3535 )
3536 try:
3537 tensor = torch.from_numpy(compressed_kv)
3538 kv_cache = DynamicCache.from_legacy_cache(tensor)
3539 except Exception as e:
3540 return False, f"{error} -> {str(e)}"
3541
3542 if not (
3543 tier_config.min_condensed_tokens
3544 <= kv_cache._seen_tokens
3545 <= tier_config.max_condensed_tokens
3546 ):
3547 return False, "Compressed tokens are not within the expected range."
3548
3549 response.util_data.bonus_compress_size = 1 - (
3550 kv_cache._seen_tokens / tier_config.max_condensed_tokens
3551 )
3552 response.util_data.compressed_length = kv_cache._seen_tokens
3553 del kv_cache
3554 del compressed_kv
3555 return True, ""
3556
3557
3558
3559---
3560File: /neurons/miner.py
3561---
3562
3563import neural_condense_core as ncc
3564import httpx
3565from typing import Tuple
3566import bittensor as bt
3567import time
3568import traceback
3569import redis
3570
3571
3572class Miner(ncc.base.BaseMiner):
3573 def __init__(self):
3574 super().__init__()
3575 self.blacklist_fns.append(self.blacklist_fn)
3576 self.forward_fns.append(self.forward_text_compress)
3577 self.redis_config = ncc.constants.DATABASE_CONFIG.redis
3578 self.redis = redis.Redis(
3579 host=self.redis_config.host,
3580 port=self.redis_config.port,
3581 db=self.redis_config.db,
3582 decode_responses=True,
3583 )
3584 self.setup_logging()
3585 self._initialize_rate_limits()
3586
3587 def _initialize_rate_limits(self):
3588 r"""
3589 Initializes the rate limits for the miners.
3590 """
3591 self.rate_limits = {
3592 uid: ncc.validator_utils.managing.ServingCounter(
3593 rate_limit=rate_limit,
3594 uid=uid,
3595 tier=self.config.miner.tier,
3596 redis_client=self.redis,
3597 postfix_key=self.config.axon.port,
3598 )
3599 for uid, rate_limit in ncc.common.build_rate_limit(
3600 self.metagraph, self.config, tier=self.config.miner.tier
3601 ).items()
3602 }
3603 for k, v in self.rate_limits.items():
3604 v.reset_counter()
3605 bt.logging.info(
3606 f"Reset rate limit for {k}: {v.get_current_count()}/{v.rate_limit}"
3607 )
3608
3609 def run(self):
3610 self.setup_axon()
3611 bt.logging.info("Starting main loop")
3612 step = 0
3613 while True:
3614 try:
3615 # Periodically update our knowledge of the network graph.
3616 if step % 60 == 0:
3617 self.metagraph.sync()
3618 self._initialize_rate_limits()
3619 log = (
3620 f"Block: {self.metagraph.block.item()} | "
3621 f"Incentive: {self.metagraph.I[self.my_subnet_uid]} | "
3622 )
3623 bt.logging.info(log)
3624 step += 1
3625 time.sleep(10)
3626
3627 except KeyboardInterrupt:
3628 self.axon.stop()
3629 bt.logging.success("Miner killed by keyboard interrupt.")
3630 break
3631 except Exception as e:
3632 bt.logging.error(f"Miner exception: {e}")
3633 bt.logging.error(traceback.format_exc())
3634 continue
3635
3636 async def forward_text_compress(
3637 self, synapse: ncc.protocol.TextCompressProtocol
3638 ) -> ncc.protocol.TextCompressProtocol:
3639 r"""
3640 Forward function for the Text-Compress task.
3641 Args:
3642 synapse (TextCompressProtocol): The synapse containing the text to compress.
3643 Returns:
3644 synapse (TextCompressProtocol): The synapse containing the compressed tokens.
3645 """
3646 bt.logging.info(
3647 f"Forwarding text compress: {synapse.context[:100]}...{synapse.context[-100:]}"
3648 )
3649 bt.logging.info(f"Context length: {len(synapse.context)}")
3650
3651 payload = synapse.miner_payload
3652
3653 async with httpx.AsyncClient(timeout=synapse.timeout) as client:
3654 response = await client.post(
3655 f"http://{self.config.miner.backend_host}:{self.config.miner.backend_port}/condense",
3656 json=payload,
3657 )
3658 response = response.json()
3659
3660 if self.config.miner.tier == "universal":
3661 synapse.compressed_context = response["compressed_context"]
3662 return synapse
3663
3664 elif self.config.miner.tier == "research":
3665 compressed_kv_url = response["compressed_kv_url"]
3666 bt.logging.info(f"Compressed & uploaded to {compressed_kv_url}")
3667 return ncc.protocol.TextCompressProtocol(
3668 compressed_kv_url=compressed_kv_url
3669 )
3670
3671 def blacklist_fn(
3672 self, synapse: ncc.protocol.TextCompressProtocol
3673 ) -> Tuple[bool, str]:
3674 r"""
3675 Blacklist function for the Text-Compress task.
3676 Args:
3677 synapse (TextCompressProtocol): The synapse containing the text to compress.
3678 Returns:
3679 bool: Whether to blacklist the synapse.
3680 reason (str): The reason for blacklisting the synapse.
3681 """
3682 hotkey = synapse.dendrite.hotkey
3683 uid = self.metagraph.hotkeys.index(hotkey)
3684 stake = self.metagraph.S[uid]
3685 if stake < ncc.constants.MIN_STAKE:
3686 return True, "Stake too low."
3687 allowed = self.rate_limits[uid].increment()
3688 bt.logging.info(
3689 f"Rate limit: {uid} {self.rate_limits[uid].get_current_count()}/{self.rate_limits[uid].rate_limit}"
3690 )
3691 if not allowed:
3692 return True, "Rate limit exceeded."
3693 return False, ""
3694
3695
3696if __name__ == "__main__":
3697 miner = Miner()
3698 miner.run()
3699
3700
3701
3702---
3703File: /neurons/validator.py
3704---
3705
3706from neural_condense_core import (
3707 base,
3708 validator_utils as vutils,
3709 constants,
3710 logger,
3711)
3712from neural_condense_core.common import clean_tmp_directory
3713from neural_condense_core.protocol import TextCompressProtocol
3714import pandas as pd
3715import bittensor as bt
3716import random
3717from transformers import AutoTokenizer
3718import traceback
3719import asyncio
3720from concurrent.futures import ThreadPoolExecutor
3721import time
3722
3723
3724class Validator(base.BaseValidator):
3725 def __init__(self):
3726 super().__init__()
3727 self.miner_manager = vutils.managing.MinerManager(
3728 uid=self.uid,
3729 wallet=self.wallet,
3730 metagraph=self.metagraph,
3731 config=self.config,
3732 )
3733 self.challenge_generator = vutils.synthesizing.ChallengeGenerator(
3734 keypair=self.dendrite.keypair
3735 )
3736
3737 if self.config.validator.use_wandb:
3738 vutils.loop.initialize_wandb(self.dendrite, self.metagraph, self.uid)
3739
3740 self.set_weights_executor = ThreadPoolExecutor(max_workers=1)
3741 self.metadata_task = None
3742 self.metadata_interval = 600 # 10 minutes in seconds
3743
3744 async def start_epoch(self):
3745 clean_tmp_directory()
3746 logger.info("Running epoch.")
3747 await self.miner_manager.sync()
3748
3749 tasks = [
3750 self.loop.create_task(self._forward_tier(tier))
3751 for tier in constants.TIER_CONFIG
3752 ]
3753 try:
3754 await asyncio.gather(*tasks, return_exceptions=True)
3755 except asyncio.TimeoutError as e:
3756 logger.warning(f"Epoch tasks timed out: {e}")
3757 except Exception as e:
3758 logger.error(f"Error running epoch tasks: {e}")
3759 traceback.print_exc()
3760
3761 async def report_metadata(self):
3762 try:
3763 await self.miner_manager.report_metadata()
3764 logger.info("Reported metadata successfully")
3765 except Exception as e:
3766 logger.error(f"Failed to report metadata: {e}")
3767
3768 async def _forward_tier(self, tier: str):
3769 try:
3770 if constants.TIER_CONFIG[tier].incentive_percentage == 0:
3771 logger.info(f"Tier {tier} has no incentive percentage.")
3772 return
3773 model_name = random.choice(constants.TIER_CONFIG[tier].supporting_models)
3774 tokenizer = AutoTokenizer.from_pretrained(model_name)
3775 serving_counter = self.miner_manager.serving_counter.get(tier, {})
3776
3777 if not serving_counter:
3778 logger.info(f"No miners in tier {tier}.")
3779 return
3780 rate_limit = self.miner_manager.rate_limit_per_tier[tier]
3781 n_sets = max(
3782 int(rate_limit * constants.RPE_PERCENTAGE_FOR_SYNTHETIC),
3783 1,
3784 )
3785 futures = []
3786 except Exception as e:
3787 logger.error(f"Error in _forward_tier: {e}")
3788 traceback.print_exc()
3789 return
3790
3791 task_config = vutils.loop.get_task_config()
3792 model_name = random.choice(constants.TIER_CONFIG[tier].supporting_models)
3793 tokenizer = AutoTokenizer.from_pretrained(model_name)
3794 ground_truth_synapses = [
3795 await vutils.loop.prepare_synapse(
3796 challenge_generator=self.challenge_generator,
3797 tier=tier,
3798 task_config=task_config,
3799 tier_config=constants.TIER_CONFIG[tier],
3800 model_name=model_name,
3801 )
3802 for _ in range(n_sets)
3803 ]
3804
3805 sleep_per_set = constants.EPOCH_LENGTH / n_sets
3806
3807 logger.info(f"Prepared {len(ground_truth_synapses)} ground truth synapses.")
3808
3809 for i, ground_truth_synapse in enumerate(ground_truth_synapses):
3810 logger.info(
3811 f"Processing set {i}/{n_sets} then sleeping for {sleep_per_set} seconds."
3812 )
3813 total_uids = list(serving_counter.keys())
3814 logger.info(f"Total uids: {total_uids}")
3815 random.shuffle(total_uids)
3816 batched_uids = [total_uids[i : i + 4] for i in range(0, len(total_uids), 4)]
3817
3818 for uids in batched_uids:
3819 start_time = time.time()
3820 # Wait if we have too many pending futures
3821 pending_futures = [f for f in futures if not f.done()]
3822 while len(pending_futures) >= 10:
3823 logger.info(
3824 f"Waiting for {len(pending_futures)} futures to complete."
3825 )
3826 await asyncio.sleep(1)
3827 # Clean up completed futures
3828 futures = [f for f in futures if not f.done()]
3829 pending_futures = [f for f in futures if not f.done()]
3830
3831 logger.info(
3832 "Processing batch",
3833 uids=uids,
3834 sleep=sleep_per_set / len(batched_uids),
3835 )
3836 future = self.loop.create_task(
3837 self._forward_batch(
3838 tier,
3839 model_name,
3840 uids,
3841 ground_truth_synapse,
3842 task_config,
3843 tokenizer,
3844 )
3845 )
3846 futures.append(future)
3847 await asyncio.sleep(sleep_per_set / len(batched_uids))
3848 await asyncio.gather(*futures, return_exceptions=True)
3849 logger.info(f"Finished processing batch {i}/{n_sets}.")
3850 logger.info(f"Setting weights for batch {i}/{n_sets}.")
3851 await self.report_metadata()
3852 self.set_weights()
3853 logger.info("Finished processing all batches.")
3854
3855 async def _forward_batch(
3856 self,
3857 tier: str,
3858 model_name: str,
3859 batched_uids: list[int],
3860 ground_truth_synapse: TextCompressProtocol,
3861 task_config,
3862 tokenizer=None,
3863 ):
3864 try:
3865 dendrite = bt.dendrite(self.wallet)
3866 synapse = ground_truth_synapse.miner_synapse
3867 logger.info(f"Querying miners {batched_uids}.")
3868 responses = await vutils.loop.query_miners(
3869 dendrite=dendrite,
3870 metagraph=self.metagraph,
3871 uids=batched_uids,
3872 synapse=synapse,
3873 timeout=constants.TIER_CONFIG[tier].timeout,
3874 )
3875
3876 if not responses:
3877 logger.warning(f"No responses from {batched_uids}.")
3878 return
3879 try:
3880 logger.info(f"Validating responses for {batched_uids}.")
3881 logger.info(responses[0].compressed_context)
3882 (
3883 valid_responses,
3884 valid_uids,
3885 invalid_uids,
3886 invalid_reasons,
3887 ) = await vutils.loop.validate_responses(
3888 responses=responses,
3889 uids=batched_uids,
3890 tier_config=constants.TIER_CONFIG[tier],
3891 tokenizer=tokenizer,
3892 tier=tier,
3893 ground_truth_synapse=ground_truth_synapse,
3894 )
3895 except Exception as e:
3896 logger.error(f"Error validating responses: {e}")
3897 traceback.print_exc()
3898 return
3899 try:
3900 logger.info(
3901 f"Processing and scoring responses for valid_uids: {valid_uids}"
3902 )
3903 start_time = time.time()
3904 logs, total_uids = await vutils.loop.process_and_score_responses(
3905 miner_manager=self.miner_manager,
3906 valid_responses=valid_responses,
3907 valid_uids=valid_uids,
3908 invalid_uids=invalid_uids,
3909 ground_truth_synapse=ground_truth_synapse,
3910 model_name=model_name,
3911 task_config=task_config,
3912 tier_config=constants.TIER_CONFIG[tier],
3913 config=self.config,
3914 invalid_reasons=invalid_reasons,
3915 timeout=300,
3916 tier=tier,
3917 )
3918 end_time = time.time()
3919 logger.info(
3920 f"Time taken to process and score responses: {end_time - start_time:.2f} seconds"
3921 )
3922 except Exception as e:
3923 logger.error(f"Error processing and scoring responses: {e}")
3924 return
3925
3926 batch_information = (
3927 f"Batch Metrics - {tier} - {model_name} - {task_config.task}"
3928 )
3929 batch_report_df = vutils.loop.logging.log_as_dataframe(logs)
3930 logger.info(
3931 f"Logging dataframe {batch_information}:\n{batch_report_df.to_markdown()}"
3932 )
3933
3934 if self.config.validator.use_wandb:
3935 vutils.loop.logging.log_wandb(logs, total_uids, tier=tier)
3936
3937 await self.miner_manager.report(
3938 {
3939 "comparision": batch_report_df.to_dict(),
3940 "challenge": ground_truth_synapse.validator_payload,
3941 "task": task_config.task,
3942 "tier": tier,
3943 },
3944 "api/report-batch",
3945 )
3946
3947 except Exception as e:
3948 traceback.print_exc()
3949 logger.error(f"Error: {e}")
3950
3951 def set_weights(self):
3952 try:
3953 self.current_block = self.subtensor.get_current_block()
3954 except OSError as e:
3955 logger.warning(f"Subtensor not available, reconnecting: {e}")
3956 self.subtensor = bt.subtensor(config=self.config)
3957 logger.info("Reconnected to subtensor.")
3958 self.current_block = self.subtensor.get_current_block()
3959 except Exception as e:
3960 logger.error(f"Error getting current block: {e}")
3961 traceback.print_exc()
3962 return
3963 self.last_update = self.metagraph.last_update[self.uid]
3964 weights = self.miner_manager.get_normalized_ratings(
3965 top_percentage=constants.TOP_PERCENTAGE_FOR_ALLOCATING_WEIGHTS
3966 )
3967 (
3968 processed_weight_uids,
3969 processed_weights,
3970 ) = bt.utils.weight_utils.process_weights_for_netuid(
3971 uids=self.metagraph.uids,
3972 weights=weights,
3973 netuid=self.config.netuid,
3974 subtensor=self.subtensor,
3975 metagraph=self.metagraph,
3976 )
3977 (
3978 uint_uids,
3979 uint_weights,
3980 ) = bt.utils.weight_utils.convert_weights_and_uids_for_emit(
3981 uids=processed_weight_uids, weights=processed_weights
3982 )
3983 if self.current_block > self.last_update + constants.SUBNET_TEMPO:
3984 weight_info = list(zip(uint_uids, uint_weights))
3985 weight_info_df = pd.DataFrame(weight_info, columns=["uid", "weight"])
3986 logger.info(f"Weight info:\n{weight_info_df.to_markdown()}")
3987 logger.info("Actually trying to set weights.")
3988 try:
3989 future = self.set_weights_executor.submit(
3990 self.subtensor.set_weights,
3991 netuid=self.config.netuid,
3992 wallet=self.wallet,
3993 uids=uint_uids,
3994 weights=uint_weights,
3995 )
3996 success, msg = future.result(timeout=120)
3997 if not success:
3998 logger.error(f"Failed to set weights: {msg}")
3999 except Exception as e:
4000 logger.error(f"Failed to set weights: {e}")
4001 traceback.print_exc()
4002
4003 logger.info(f"Set weights result: {success}")
4004 else:
4005 logger.info(
4006 f"Not setting weights because current block {self.current_block} is not greater than last update {self.last_update} + tempo {constants.SUBNET_TEMPO}"
4007 )
4008
4009
4010if __name__ == "__main__":
4011 with Validator() as validator:
4012 while True:
4013 logger.info("validator_status", object=validator)
4014 time.sleep(60 * 10)
4015
4016
4017
4018---
4019File: /scripts/enable_hf_transfer.sh
4020---
4021
4022uv add hf_transfer --prerelease=allow
4023export HF_HUB_ENABLE_HF_TRANSFER=1
4024
4025
4026---
4027File: /scripts/install_redis.sh
4028---
4029
4030#!/bin/bash
4031
4032# Function to check if the user has root privileges
4033check_root() {
4034 if [ "$(id -u)" -ne 0 ]; then
4035 echo "You are not running as root. Commands requiring root will use 'sudo'."
4036 SUDO="sudo"
4037 else
4038 echo "You are running as root. 'sudo' is not required."
4039 SUDO=""
4040 fi
4041}
4042
4043# Run the root check
4044check_root
4045
4046# Update the package list
4047$SUDO apt update
4048
4049# Install Redis
4050$SUDO apt install -y redis
4051
4052
4053
4054# Verify installation
4055if redis-cli --version; then
4056 echo "Redis installed successfully."
4057else
4058 echo "Redis installation failed."
4059 exit 1
4060fi
4061
4062# Attempt to start Redis with systemctl
4063echo "Attempting to start Redis using systemctl..."
4064if $SUDO systemctl start redis 2>/dev/null; then
4065 echo "Redis started successfully using systemctl."
4066else
4067 echo "systemctl not available or failed. Starting Redis manually..."
4068 if redis-server --daemonize yes; then
4069 echo "Redis started manually in the background."
4070 else
4071 echo "Failed to start Redis. Check your setup."
4072 exit 1
4073 fi
4074fi
4075
4076# Enable Redis to start on boot (optional, for non-WSL environments)
4077if $SUDO systemctl enable redis 2>/dev/null; then
4078 echo "Redis enabled to start on boot using systemctl."
4079else
4080 echo "systemctl not available. Skipping boot configuration."
4081fi
4082
4083# Test Redis
4084if redis-cli ping | grep -q "PONG"; then
4085 echo "Redis is working correctly!"
4086else
4087 echo "Redis test failed. Check the service status or logs."
4088fi
4089
4090
4091
4092---
4093File: /services/miner_backend/soft_token/soft_token_condenser_modeling.py
4094---
4095
4096import torch
4097import torch.nn as nn
4098import huggingface_hub
4099from transformers import AutoModelForCausalLM, AutoTokenizer
4100
4101
4102class Condenser(nn.Module):
4103 """
4104 A neural module for condensing large text contexts into smaller dense representations
4105 """
4106
4107 def __init__(
4108 self,
4109 num_condense_tokens: int,
4110 hidden_size: int,
4111 n_last_hidden_states: int,
4112 condense_model: AutoModelForCausalLM,
4113 condense_tokenizer: AutoTokenizer,
4114 ):
4115 super().__init__()
4116 self.dtype = torch.bfloat16
4117 self.num_condense_tokens = num_condense_tokens
4118 self.hidden_size = hidden_size
4119 self.n_last_hidden_states = n_last_hidden_states
4120 self.condense_model = condense_model
4121 self.condense_tokenizer = condense_tokenizer
4122
4123 self.norm = nn.LayerNorm(self.hidden_size * n_last_hidden_states).to(
4124 dtype=self.dtype, device="cuda"
4125 )
4126 self.linear = nn.Linear(self.hidden_size * n_last_hidden_states, 4096).to(
4127 dtype=self.dtype, device="cuda"
4128 )
4129 self.pre_condensed_tokens = nn.Parameter(
4130 torch.randn(
4131 1,
4132 num_condense_tokens,
4133 self.hidden_size,
4134 dtype=self.dtype,
4135 device="cuda",
4136 )
4137 )
4138
4139 @classmethod
4140 def from_pretrained(cls, repo_id, local_dir="./", dtype=torch.bfloat16):
4141 # Download and load checkpoint
4142 file_path = huggingface_hub.hf_hub_download(
4143 repo_id=repo_id,
4144 filename="checkpoints/modules.pt",
4145 local_dir=local_dir,
4146 )
4147 state_dict = torch.load(file_path)
4148
4149 # Extract model configuration
4150 num_condense_tokens = state_dict["modules"]["pre_condensed_tokens"].shape[1]
4151 hidden_size = state_dict["modules"]["pre_condensed_tokens"].shape[2]
4152 linear_input_dim = state_dict["modules"]["linear_state_dict"]["weight"].shape[1]
4153 n_last_hidden_states = linear_input_dim // hidden_size
4154
4155 # Load model and tokenizer
4156 condense_model = AutoModelForCausalLM.from_pretrained(
4157 repo_id, torch_dtype=dtype
4158 ).to("cuda")
4159 condense_tokenizer = AutoTokenizer.from_pretrained("unsloth/Llama-3.2-1B")
4160 condense_tokenizer.pad_token = condense_tokenizer.eos_token
4161
4162 # Initialize and load state_dict
4163 model = cls(
4164 num_condense_tokens,
4165 hidden_size,
4166 n_last_hidden_states,
4167 condense_model,
4168 condense_tokenizer,
4169 )
4170 model.load_state_dict(state_dict["modules"])
4171 return model
4172
4173 def load_state_dict(self, state_dict: dict):
4174 self.pre_condensed_tokens.data = state_dict["pre_condensed_tokens"].to(
4175 dtype=self.dtype, device="cuda"
4176 )
4177 self.linear.load_state_dict(
4178 {
4179 k: v.to(dtype=self.dtype, device="cuda")
4180 for k, v in state_dict["linear_state_dict"].items()
4181 }
4182 )
4183 self.norm.load_state_dict(
4184 {
4185 k: v.to(dtype=self.dtype, device="cuda")
4186 for k, v in state_dict["norm_state_dict"].items()
4187 }
4188 )
4189
4190 @torch.no_grad()
4191 def compress(self, context: str) -> torch.Tensor:
4192 # Tokenize and process context
4193 output = self.condense_tokenizer(
4194 context,
4195 return_tensors="pt",
4196 add_special_tokens=False,
4197 padding="max_length",
4198 max_length=4096,
4199 truncation=True,
4200 return_attention_mask=True,
4201 )
4202 context_ids = output.input_ids.to(device="cuda")
4203 attention_mask = output.attention_mask.to(device="cuda")
4204
4205 # Processing embedding and condensation
4206 context_embeds = self.condense_model.get_input_embeddings()(context_ids)
4207 inputs_embeds_condense = torch.cat(
4208 [context_embeds, self.pre_condensed_tokens], dim=1
4209 )
4210 expanded_attention_mask = torch.cat(
4211 [
4212 attention_mask,
4213 torch.ones(
4214 attention_mask.shape[0],
4215 self.num_condense_tokens,
4216 dtype=attention_mask.dtype,
4217 device=attention_mask.device,
4218 ),
4219 ],
4220 dim=1,
4221 )
4222
4223 # Generate condensed tokens
4224 output = self.condense_model(
4225 inputs_embeds=inputs_embeds_condense,
4226 output_hidden_states=True,
4227 attention_mask=expanded_attention_mask,
4228 )
4229 hidden_states = torch.cat(
4230 output.hidden_states[-self.n_last_hidden_states :], dim=-1
4231 )[:, -self.num_condense_tokens :, :]
4232
4233 return self.linear(self.norm(hidden_states))
4234
4235
4236
4237---
4238File: /services/miner_backend/app.py
4239---
4240
4241from flask import Flask, request, jsonify
4242import time
4243from transformers import AutoModelForCausalLM, AutoTokenizer, DynamicCache
4244from kvpress import KnormPress
4245import torch
4246from .soft_token.soft_token_condenser_modeling import Condenser
4247import os
4248import minio
4249import structlog
4250from .utils import upload_to_minio
4251import argparse
4252
4253logger = structlog.get_logger()
4254logger.info("This will show in Uvicorn logs")
4255
4256
4257class CompressionService:
4258 def __init__(self, algorithm: str):
4259 self.dtype = torch.bfloat16
4260 self.algorithm = algorithm
4261 self._init_minio_client()
4262 self._init_model()
4263
4264 def _init_minio_client(self):
4265 """Initialize MinIO client and validate config"""
4266 self.bucket_name = os.getenv("MINIO_BUCKET", "condense_miner")
4267 self.endpoint_url = os.getenv("MINIO_SERVER")
4268
4269 self._validate_minio_config()
4270
4271 self.minio_client = minio.Minio(
4272 self.endpoint_url.replace("http://", "").replace("https://", ""),
4273 access_key=os.getenv("MINIO_ACCESS_KEY"),
4274 secret_key=os.getenv("MINIO_SECRET_KEY"),
4275 secure=False,
4276 )
4277
4278 def _init_model(self):
4279 """Initialize model based on selected algorithm"""
4280 self.device = "cuda"
4281
4282 if self.algorithm == "kvpress":
4283 self.ckpt = "Condense-AI/Mistral-7B-Instruct-v0.2"
4284 self.tokenizer = AutoTokenizer.from_pretrained(self.ckpt)
4285 self.model = AutoModelForCausalLM.from_pretrained(
4286 self.ckpt, torch_dtype=self.dtype
4287 ).to(self.device)
4288 self.press = KnormPress(compression_ratio=0.75)
4289
4290 elif self.algorithm == "soft_token":
4291 self.ckpt = "Condense-AI/Mistral-7B-Instruct-v0.2"
4292 self.repo_id = "Condense-AI/Soft-Token-Condenser-Llama-3.2-1B"
4293 self.condenser = Condenser.from_pretrained(self.repo_id, dtype=self.dtype)
4294 self.condenser.eval()
4295 self.tokenizer = AutoTokenizer.from_pretrained(self.ckpt)
4296 self.model = AutoModelForCausalLM.from_pretrained(
4297 self.ckpt, torch_dtype=self.dtype
4298 ).to(self.device)
4299 self.press = KnormPress(compression_ratio=0.75)
4300
4301 elif self.algorithm == "activation_beacon":
4302 self.ckpt = "namespace-Pt/ultragist-mistral-7b-inst"
4303 self.tokenizer = AutoTokenizer.from_pretrained(
4304 self.ckpt,
4305 trust_remote_code=True,
4306 )
4307 self.model = AutoModelForCausalLM.from_pretrained(
4308 self.ckpt,
4309 trust_remote_code=True,
4310 torch_dtype=self.dtype,
4311 attn_implementation="sdpa",
4312 ultragist_ratio=[4],
4313 ).to(self.device)
4314
4315 @torch.no_grad()
4316 def compress_context(self, context: str) -> str:
4317 """Compress context using selected algorithm"""
4318 if self.algorithm == "kvpress":
4319 return self._compress_kvpress(context)
4320 elif self.algorithm == "soft_token":
4321 return self._compress_soft_token(context)
4322 elif self.algorithm == "activation_beacon":
4323 return self._compress_activation_beacon(context)
4324
4325 def _compress_kvpress(self, context: str) -> str:
4326 input_ids = self.tokenizer(
4327 context, return_tensors="pt", add_special_tokens=False
4328 ).input_ids.to(self.device)
4329
4330 with torch.no_grad(), self.press(self.model):
4331 past_key_values = self.model(
4332 input_ids, num_logits_to_keep=1
4333 ).past_key_values
4334
4335 return self._save_and_return_url(past_key_values)
4336
4337 def _compress_soft_token(self, context: str) -> str:
4338 compressed_tokens = self.condenser.compress(context)
4339
4340 with torch.no_grad(), self.press(self.model):
4341 past_key_values = self.model(
4342 inputs_embeds=compressed_tokens
4343 ).past_key_values
4344
4345 return self._save_and_return_url(past_key_values)
4346
4347 def _compress_activation_beacon(self, context: str) -> str:
4348 input_ids = self.tokenizer(context, return_tensors="pt").input_ids.to(
4349 self.device
4350 )
4351
4352 self.model.memory.reset()
4353 self.model(input_ids=input_ids)
4354 past_key_values = self.model.memory.get_memory()
4355
4356 # Log metrics specific to activation beacon
4357 ultragist_size, raw_size, sink_size = self.model.memory.get_memory_size()
4358 logger.info(
4359 "model_metrics",
4360 ultragist_size=ultragist_size,
4361 raw_size=raw_size,
4362 sink_size=sink_size,
4363 )
4364
4365 return self._save_and_return_url(past_key_values)
4366
4367 def _save_and_return_url(self, past_key_values):
4368 """Process output and save to MinIO"""
4369 DynamicCache(past_key_values)
4370
4371 numpy_past_key_values = tuple(
4372 tuple(tensor.to(dtype=torch.float32).cpu().numpy() for tensor in tensors)
4373 for tensors in past_key_values
4374 )
4375
4376 filename = f"{int(time.time_ns())}.npy"
4377 upload_to_minio(
4378 self.minio_client, self.bucket_name, filename, numpy_past_key_values
4379 )
4380
4381 return f"{self.endpoint_url}/{self.bucket_name}/{filename}"
4382
4383 def _validate_minio_config(self):
4384 """Validate MinIO configuration"""
4385 if not self.endpoint_url:
4386 raise ValueError("MINIO_SERVER is not set")
4387 if not self.bucket_name:
4388 raise ValueError("MINIO_BUCKET is not set")
4389 if not os.getenv("MINIO_ACCESS_KEY"):
4390 raise ValueError("MINIO_ACCESS_KEY is not set")
4391 if not os.getenv("MINIO_SECRET_KEY"):
4392 raise ValueError("MINIO_SECRET_KEY is not set")
4393 if not (
4394 self.endpoint_url.startswith("http://")
4395 or self.endpoint_url.startswith("https://")
4396 ):
4397 raise ValueError("MINIO_SERVER must start with http:// or https://")
4398
4399
4400def create_app(algorithm):
4401 app = Flask(__name__)
4402 service = CompressionService(algorithm)
4403
4404 @app.route("/condense", methods=["POST"])
4405 def compress_endpoint():
4406 """Endpoint for compressing context"""
4407 data = request.get_json()
4408 context = data.get("context")
4409 target_model = data.get("target_model")
4410
4411 if not context:
4412 return jsonify({"error": "Missing 'context' in request"}), 400
4413
4414 try:
4415 compressed_kv_url = service.compress_context(context)
4416 return jsonify(
4417 {"target_model": target_model, "compressed_kv_url": compressed_kv_url}
4418 )
4419 except Exception as e:
4420 logger.exception("compression_failed", error=str(e))
4421 return (
4422 jsonify({"error": "Failed to process request", "details": str(e)}),
4423 500,
4424 )
4425
4426 return app
4427
4428
4429# This allows direct running of the file
4430if __name__ == "__main__":
4431 parser = argparse.ArgumentParser()
4432 parser.add_argument(
4433 "--algorithm",
4434 default="kvpress",
4435 choices=["kvpress", "soft_token", "activation_beacon"],
4436 )
4437 args = parser.parse_args()
4438 app = create_app(args.algorithm)
4439 app.run()
4440
4441
4442
4443---
4444File: /services/miner_backend/object_cleaner.py
4445---
4446
4447import time
4448import os
4449from minio import Minio
4450import structlog
4451from datetime import datetime, timezone
4452
4453logger = structlog.get_logger()
4454
4455
4456class MinioCleanup:
4457 def __init__(self):
4458 self.bucket_name = os.getenv("MINIO_BUCKET", "condense_miner")
4459 self.minio_client = Minio(
4460 f"localhost:{os.getenv('MINIO_PORT')}",
4461 access_key=os.getenv("MINIO_ACCESS_KEY"),
4462 secret_key=os.getenv("MINIO_SECRET_KEY"),
4463 secure=False,
4464 )
4465
4466 def cleanup_old_objects(self, max_age_hours=1):
4467 """Delete objects older than max_age_hours"""
4468 try:
4469 # Get current time in UTC
4470 now = datetime.now(timezone.utc)
4471
4472 # List all objects in the bucket
4473 objects = list(self.minio_client.list_objects(self.bucket_name))
4474 total_files = len(objects)
4475 expired_files = 0
4476
4477 for obj in objects:
4478 # Calculate age in hours
4479 age = (now - obj.last_modified).total_seconds() / 3600
4480
4481 if age > max_age_hours:
4482 self.minio_client.remove_object(self.bucket_name, obj.object_name)
4483 expired_files += 1
4484 logger.info(
4485 "deleted_object",
4486 object_name=obj.object_name,
4487 age_hours=round(age, 2),
4488 )
4489
4490 # Log summary statistics
4491 logger.info(
4492 "cleanup_summary",
4493 total_files=total_files,
4494 expired_files=expired_files,
4495 remaining_files=total_files - expired_files,
4496 bucket_name=self.bucket_name,
4497 )
4498
4499 except Exception as e:
4500 logger.exception("cleanup_failed", error=str(e))
4501
4502
4503def main():
4504 cleanup = MinioCleanup()
4505
4506 # Run cleanup every 5 minutes
4507 while True:
4508 cleanup.cleanup_old_objects()
4509 time.sleep(300) # Sleep for 5 minutes
4510
4511
4512if __name__ == "__main__":
4513 main()
4514
4515
4516
4517---
4518File: /services/miner_backend/universal_app.py
4519---
4520
4521from flask import Flask, request, jsonify
4522from llmlingua import PromptCompressor
4523import structlog
4524import argparse
4525
4526logger = structlog.get_logger()
4527logger.info("This will show in Universal Miner Backend logs")
4528
4529
4530class CompressionService:
4531 def __init__(self, algorithm: str):
4532 self.algorithm = algorithm
4533 self._init_compressor()
4534
4535 def _init_compressor(self):
4536 """Initialize model based on selected algorithm"""
4537 self.device = "cuda"
4538
4539 if self.algorithm == "llmlingua":
4540 self.compressor = PromptCompressor()
4541 elif self.algorithm == "llmlingua-2":
4542 self.compressor = PromptCompressor(
4543 model_name="microsoft/llmlingua-2-xlm-roberta-large-meetingbank",
4544 use_llmlingua2=True,
4545 )
4546
4547 def compress_context(self, context: str) -> str:
4548 """Compress context using selected algorithm"""
4549 if self.algorithm == "llmlingua":
4550 compressed_prompt = self.compressor.compress_prompt(
4551 context, instruction="", question="", target_token=200
4552 )
4553 elif self.algorithm == "llmlingua-2":
4554 compressed_prompt = self.compressor.compress_prompt(
4555 context, rate=0.7, force_tokens=["\n", "?"]
4556 )
4557 return compressed_prompt["compressed_prompt"]
4558
4559
4560def create_app(algorithm):
4561 app = Flask(__name__)
4562 service = CompressionService(algorithm)
4563
4564 @app.route("/condense", methods=["POST"])
4565 def compress_endpoint():
4566 logger.info("Join compression endpoint")
4567 """Endpoint for compressing context"""
4568 data = request.get_json()
4569 context = data.get("context")
4570 target_model = data.get("target_model")
4571
4572 if not context:
4573 return jsonify({"error": "Missing 'context' in request"}), 400
4574
4575 try:
4576 compressed_context = service.compress_context(context)
4577 return jsonify(
4578 {"target_model": target_model, "compressed_context": compressed_context}
4579 )
4580 except Exception as e:
4581 logger.exception("compression_failed", error=str(e))
4582 return (
4583 jsonify({"error": "Failed to process request", "details": str(e)}),
4584 500,
4585 )
4586
4587 return app
4588
4589
4590# This allows direct running of the file
4591if __name__ == "__main__":
4592 parser = argparse.ArgumentParser()
4593 parser.add_argument(
4594 "--algorithm",
4595 default="llmlingua-2",
4596 choices=["llmlingua", "llmlingua-2"],
4597 )
4598 args = parser.parse_args()
4599 app = create_app(args.algorithm)
4600 app.run()
4601
4602
4603
4604---
4605File: /services/miner_backend/utils.py
4606---
4607
4608import minio
4609import io
4610import numpy as np
4611
4612
4613def upload_to_minio(
4614 minio_client: minio.Minio,
4615 bucket_name: str,
4616 object_name: str,
4617 data: tuple[tuple[np.ndarray, ...], ...],
4618):
4619 buffer = io.BytesIO()
4620 np.save(buffer, data)
4621 length = buffer.tell()
4622 buffer.seek(0)
4623 result = minio_client.put_object(bucket_name, object_name, buffer, length)
4624 return result
4625
4626
4627
4628---
4629File: /services/validator_backend/organic/app.py
4630---
4631
4632import neural_condense_core.validator_utils as vutils
4633from neural_condense_core.base.config import add_common_config, add_validator_config
4634import argparse
4635import bittensor as bt
4636from neural_condense_core.logger import logger
4637import time
4638
4639
4640def setup_config():
4641 parser = argparse.ArgumentParser()
4642 parser = add_common_config(parser)
4643 parser = add_validator_config(parser)
4644 config = bt.config(parser)
4645 logger.info(f"Config: {config}")
4646 return config
4647
4648
4649def setup_bittensor_objects(config: bt.config):
4650 wallet = bt.wallet(config=config)
4651 subtensor = bt.subtensor(config=config)
4652 metagraph = subtensor.metagraph(config.netuid)
4653 return wallet, metagraph
4654
4655
4656def setup_miner_manager(config: bt.config, wallet, metagraph: bt.metagraph):
4657 neuron_uid = metagraph.hotkeys.index(wallet.hotkey.ss58_address)
4658 miner_manager = vutils.managing.MinerManager(
4659 uid=neuron_uid, wallet=wallet, metagraph=metagraph, config=None
4660 )
4661 return miner_manager
4662
4663
4664def setup_organic_gate(config: bt.config, miner_manager: vutils.managing.MinerManager):
4665 organic_gate = vutils.monetize.OrganicGate(
4666 miner_manager=miner_manager, config=config
4667 )
4668 return organic_gate
4669
4670
4671def main():
4672 config = setup_config()
4673 wallet, metagraph = setup_bittensor_objects(config)
4674 miner_manager = setup_miner_manager(config, wallet, metagraph)
4675 organic_gate = setup_organic_gate(config, miner_manager)
4676 organic_gate.start_server()
4677
4678
4679if __name__ == "__main__":
4680 main()
4681 while True:
4682 time.sleep(60)
4683 logger.info("Running...")
4684
4685
4686
4687---
4688File: /services/validator_backend/scoring/anti_exploitation/filter_existance.py
4689---
4690
4691from transformers import AutoTokenizer, AutoModelForCausalLM, DynamicCache
4692from typing import List
4693import random
4694from semantic_text_splitter import TextSplitter
4695from copy import deepcopy
4696from datasets import load_dataset
4697from typing import Tuple
4698from ..utils import generate_answer
4699import re
4700import structlog
4701
4702logger = structlog.get_logger("filter_existance")
4703
4704
4705class FilterExistanceChecker:
4706 def _check_text_exists(
4707 self,
4708 tokenizer: AutoTokenizer,
4709 model: AutoModelForCausalLM,
4710 kv_cache: DynamicCache,
4711 query_chunk: str,
4712 context_length: int,
4713 ) -> bool:
4714 _kv_cache = deepcopy(kv_cache)
4715 prompt = f"""
4716You are a precise and objective fact-checker. Your task is to determine whether the following quoted text appears in the provided context or is a direct paraphrase of it.
4717
4718Instructions:
4719- Consider the context to include information that might have been rephrased but retains the original meaning.
4720- Return 'yes' if the quoted text appears or is a clear paraphrase of the context.
4721- Return 'no' if the quoted text does not appear or if it is not a valid paraphrase.
4722- Your response should contain exactly one word: either 'yes' or 'no'. No additional text or explanations are required.
4723
4724Quote:
4725```
4726{query_chunk}
4727```
4728[/INST] """
4729 logger.info(f"Filter Prompt: {prompt}")
4730 prompt_ids = tokenizer.encode(
4731 prompt,
4732 return_tensors="pt",
4733 add_special_tokens=False,
4734 )
4735 completion_text = generate_answer(
4736 model=model,
4737 tokenizer=tokenizer,
4738 question_ids=prompt_ids,
4739 cache=_kv_cache,
4740 context_length=context_length,
4741 max_new_tokens=8,
4742 )
4743 logger.info(f"Filter Completion: {completion_text}")
4744 # Split response into words and clean up
4745 words = re.findall(r"\b\w+\b", completion_text.lower())
4746 return "yes" in words, "no" in words or "not" in words
4747
4748 def filter_existance(
4749 self,
4750 tokenizer: AutoTokenizer,
4751 model: AutoModelForCausalLM,
4752 kv_cache: DynamicCache,
4753 positive_chunks: List[str],
4754 negative_chunks: List[str],
4755 context_length: int,
4756 ) -> float:
4757 positive_accuracies = []
4758 for positive_chunk in positive_chunks:
4759 exist_yes, exist_no = self._check_text_exists(
4760 tokenizer, model, kv_cache, positive_chunk, context_length
4761 )
4762 if not exist_yes or (exist_yes and exist_no):
4763 positive_accuracies.append(0)
4764 else:
4765 positive_accuracies.append(1)
4766 if not any(positive_accuracies):
4767 logger.info(f"All positive existance check failed: {positive_accuracies}")
4768 return 0
4769 negative_accuracies = []
4770 # Test on negative case (text not from conversation)
4771 for negative_chunk in negative_chunks:
4772 exist_yes, exists_no = self._check_text_exists(
4773 tokenizer, model, kv_cache, negative_chunk, context_length
4774 )
4775 if not exists_no or (exist_yes and exists_no):
4776 negative_accuracies.append(0)
4777 else:
4778 negative_accuracies.append(1)
4779 if not any(negative_accuracies):
4780 logger.info(f"All negative existance check failed: {negative_accuracies}")
4781 return 0
4782 accuracies = positive_accuracies + negative_accuracies
4783 logger.info(f"Filter Existance Accuracy: {accuracies}")
4784 return sum(accuracies) / len(accuracies)
4785
4786
4787
4788---
4789File: /services/validator_backend/scoring/metric_handlers/__init__.py
4790---
4791
4792from .accuracy import accuracy, preprocess_batch as accuracy_preprocess_batch
4793
4794metric_handlers = {
4795 "accuracy": {
4796 "handler": accuracy,
4797 "preprocess_batch": accuracy_preprocess_batch,
4798 },
4799}
4800
4801
4802
4803---
4804File: /services/validator_backend/scoring/metric_handlers/accuracy.py
4805---
4806
4807import torch
4808from transformers import (
4809 AutoTokenizer,
4810 DynamicCache,
4811 AutoModelForCausalLM,
4812 TextGenerationPipeline,
4813)
4814import structlog
4815from ..anti_exploitation.filter_existance import FilterExistanceChecker
4816from ..utils import generate_answer
4817from neural_condense_core.protocol import TaskData
4818from copy import deepcopy
4819
4820logger = structlog.get_logger("accuracy")
4821
4822DEFAULT_VALUE = 0
4823
4824
4825def accuracy(
4826 filter_existance_checker: FilterExistanceChecker,
4827 kv_cache: DynamicCache,
4828 task_data: TaskData,
4829 tokenizer: AutoTokenizer,
4830 model: AutoModelForCausalLM,
4831 judge_pipeline: TextGenerationPipeline,
4832 max_tokens: int = 256,
4833 **kwargs,
4834) -> float:
4835 context = task_data.formatted_context
4836 positive_chunks = task_data.positive_chunks
4837 negative_chunks = task_data.negative_chunks
4838 formatted_questions = task_data.formatted_questions
4839 questions = task_data.challenge_questions
4840 answers = task_data.challenge_answers
4841
4842 device = model.device
4843 context_ids = tokenizer.encode(
4844 context,
4845 return_tensors="pt",
4846 add_special_tokens=False,
4847 ).to(device=device, dtype=torch.long)
4848 context_length = context_ids.shape[1]
4849 num_seen_tokens = kv_cache._seen_tokens
4850 logger.debug("condense-length", length=num_seen_tokens)
4851 chunk_existance_accuracy: float = filter_existance_checker.filter_existance(
4852 tokenizer=tokenizer,
4853 model=model,
4854 kv_cache=kv_cache,
4855 positive_chunks=positive_chunks,
4856 negative_chunks=negative_chunks,
4857 context_length=context_length,
4858 )
4859 if chunk_existance_accuracy <= 0.1:
4860 logger.info(
4861 f"Too low chunk existance accuracy, skipping scoring: {chunk_existance_accuracy}"
4862 )
4863 return 0
4864
4865 questions_ids = [
4866 tokenizer(
4867 question,
4868 return_tensors="pt",
4869 add_special_tokens=False,
4870 max_length=max_tokens,
4871 ).input_ids.to(device=device, dtype=torch.long)
4872 for question in formatted_questions
4873 ]
4874 accuracies = []
4875 for question_ids, formatted_question, answer, question in zip(
4876 questions_ids, formatted_questions, answers, questions
4877 ):
4878 expected_completion_ids = tokenizer(
4879 answer,
4880 return_tensors="pt",
4881 add_special_tokens=False,
4882 ).input_ids.to(device=device, dtype=torch.long)
4883 n_expected_completion_tokens = expected_completion_ids.shape[1]
4884 max_new_tokens = max(int(n_expected_completion_tokens * 1.5), 8)
4885 _kv_cache = deepcopy(kv_cache)
4886 logger.debug("kv_length", length=_kv_cache._seen_tokens)
4887 completion = generate_answer(
4888 model=model,
4889 tokenizer=tokenizer,
4890 question_ids=question_ids,
4891 cache=_kv_cache,
4892 context_length=context_length,
4893 max_new_tokens=max_new_tokens,
4894 )
4895 ground_truth = answer.strip()
4896 logger.debug(f"Question: {formatted_question}")
4897 logger.debug(f"Completion: {completion}")
4898 logger.debug(f"Ground truth: {ground_truth}")
4899 accuracy = get_accuracy_llm(completion, ground_truth, question, judge_pipeline)
4900 accuracies.append(accuracy)
4901 logger.info(f"Accuracies: {accuracies}")
4902 return chunk_existance_accuracy * sum(accuracies) / len(accuracies)
4903
4904
4905def preprocess_batch(values: list[float]) -> list[float]:
4906 return [value if value is not None else DEFAULT_VALUE for value in values]
4907
4908
4909def get_accuracy_llm(
4910 completion: str,
4911 ground_truth: str,
4912 question: str,
4913 judge_pipeline: TextGenerationPipeline,
4914) -> float:
4915 messages = [
4916 {
4917 "role": "system",
4918 "content": "You are a helpful assistant that evaluates the correctness of a response to a question based on the ground truth.",
4919 },
4920 {
4921 "role": "user",
4922 "content": f"""Please evaluate the correctness of the following response to the question based on the ground truth.\n\n**Question**: {question}\n\n**Response**: {completion}\n\n**Ground truth**: {ground_truth}
4923You have to return 'yes' if the response is correct, 'no' if it is incorrect. The correct response should be have same meaning as the ground truth, don't need to be exactly the same. Please just return only 'yes' or 'no', don't need to explain.
4924""",
4925 },
4926 ]
4927 completion = judge_pipeline(
4928 messages,
4929 do_sample=False,
4930 max_new_tokens=16,
4931 )[0][
4932 "generated_text"
4933 ][-1]["content"]
4934 logger.debug(f"LLM Judge Response: {completion}")
4935 is_correct = "yes" in completion.lower()
4936 return 1 if is_correct else 0
4937
4938
4939
4940---
4941File: /services/validator_backend/scoring/app.py
4942---
4943
4944# TODO: Efficient switching between target models. Currently fixed to mistral-7b-instruct-v0.2.
4945from flask import Flask, request, jsonify
4946import torch
4947from transformers import (
4948 AutoTokenizer,
4949 AutoModelForCausalLM,
4950 DynamicCache,
4951 TextGenerationPipeline,
4952)
4953import random
4954import structlog
4955import gc
4956from neural_condense_core.protocol import BatchedScoringRequest
4957import traceback
4958from .metric_handlers import metric_handlers
4959from .anti_exploitation.filter_existance import FilterExistanceChecker
4960import time
4961import numpy as np
4962import io
4963
4964gc.enable()
4965
4966logger = structlog.get_logger("Validator-Backend")
4967
4968
4969def load_compressed_kv(filename: str) -> np.ndarray:
4970 with open(filename, "rb") as f:
4971 buffer = io.BytesIO(f.read())
4972 return np.load(buffer).astype(np.float32)
4973
4974
4975class ScoringService:
4976 def __init__(self):
4977 """
4978 Initializes the ScoringService with model and tokenizer storage, device configuration,
4979 and a lock for thread-safe operations. Runs a unit test to verify setup.
4980 """
4981 self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
4982 self.dtype = torch.bfloat16
4983 self.model = AutoModelForCausalLM.from_pretrained(
4984 "Condense-AI/Mistral-7B-Instruct-v0.2"
4985 ).to(dtype=self.dtype, device=self.device)
4986 self.tokenizer = AutoTokenizer.from_pretrained(
4987 "Condense-AI/Mistral-7B-Instruct-v0.2"
4988 )
4989
4990 j_tokenizer = AutoTokenizer.from_pretrained(
4991 "upstage/solar-pro-preview-instruct"
4992 )
4993 j_model = AutoModelForCausalLM.from_pretrained(
4994 "upstage/solar-pro-preview-instruct",
4995 torch_dtype=self.dtype,
4996 trust_remote_code=True,
4997 )
4998 self.judge_pipeline = TextGenerationPipeline(
4999 model=j_model,
5000 tokenizer=j_tokenizer,
5001 device=self.device,
5002 torch_dtype=self.dtype,
5003 )
5004 self.filter_existance_checker = FilterExistanceChecker()
5005
5006 @torch.no_grad()
5007 def get_metrics(self, request: BatchedScoringRequest) -> dict[str, float]:
5008 logger.info("Received request")
5009 criteria = random.choice(request.criterias)
5010 values = []
5011 metric_handler = metric_handlers[criteria]["handler"]
5012 preprocess_batch = metric_handlers[criteria]["preprocess_batch"]
5013 logger.info(
5014 "positive_chunks",
5015 positive_chunks=request.task_data.positive_chunks,
5016 )
5017 logger.info(
5018 "negative_chunks",
5019 negative_chunks=request.task_data.negative_chunks,
5020 )
5021 for miner_response in request.miner_responses:
5022 try:
5023 kv_cache = DynamicCache.from_legacy_cache(
5024 torch.from_numpy(load_compressed_kv(miner_response.filename)).to(
5025 device=self.device, dtype=self.dtype
5026 )
5027 )
5028 start_time = time.time()
5029 value = metric_handler(
5030 filter_existance_checker=self.filter_existance_checker,
5031 kv_cache=kv_cache,
5032 model=self.model,
5033 tokenizer=self.tokenizer,
5034 task_data=request.task_data,
5035 judge_pipeline=self.judge_pipeline,
5036 )
5037 end_time = time.time()
5038 logger.info(
5039 "metric_handler_time",
5040 handler_name=metric_handler.__name__,
5041 time_taken=f"{end_time - start_time:.2f}s",
5042 )
5043 except Exception as e:
5044 logger.error(
5045 "metric_handler_error",
5046 error=str(e),
5047 handler_name=metric_handler.__name__,
5048 traceback=traceback.format_exc(),
5049 )
5050 value = None
5051 values.append(value)
5052 logger.info(
5053 "metric_value", handler_name=metric_handler.__name__, value=value
5054 )
5055 values = preprocess_batch(values)
5056 return {"metrics": {criteria: values}}
5057
5058
5059app = Flask(__name__)
5060scoring_service = ScoringService()
5061
5062
5063@app.route("/", methods=["GET"])
5064def is_alive():
5065 return jsonify({"message": "I'm alive!"})
5066
5067
5068@app.route("/get_metrics", methods=["POST"])
5069def get_metrics():
5070 request_data = BatchedScoringRequest(**request.get_json())
5071 return jsonify(scoring_service.get_metrics(request_data))
5072
5073
5074
5075---
5076File: /services/validator_backend/scoring/datatypes.py
5077---
5078
5079from typing import Any, List
5080from pydantic import BaseModel
5081import numpy as np
5082import io
5083import base64
5084
5085
5086class MinerResponse(BaseModel):
5087 filename: str
5088 compressed_kv: Any = None
5089
5090 def decode(self):
5091 self.compressed_kv = load_npy_from_filename(self.filename)
5092
5093
5094class GroundTruthRequest(BaseModel):
5095 context: str
5096 expected_completion: str
5097 activation_prompt: str
5098 model_name: str
5099 messages: List[dict]
5100 hidden_messages: List[dict]
5101 criterias: List[str]
5102 positive_chunks: List[str]
5103 negative_chunks: List[str]
5104
5105
5106class BatchedScoringRequest(BaseModel):
5107 miner_responses: List[MinerResponse]
5108 ground_truth_request: GroundTruthRequest
5109
5110
5111def load_npy_from_filename(filename: str) -> np.ndarray:
5112 with open(filename, "rb") as f:
5113 buffer = io.BytesIO(f.read())
5114 return np.load(buffer).astype(np.float32)
5115
5116
5117def base64_to_ndarray(base64_str: str) -> np.ndarray:
5118 try:
5119 """Convert a base64-encoded string back to a NumPy array."""
5120 buffer = io.BytesIO(base64.b64decode(base64_str))
5121 buffer.seek(0)
5122 array = np.load(buffer)
5123 array = array.astype(np.float32)
5124 except Exception as e:
5125 print(e)
5126 return None
5127 return array
5128
5129
5130def ndarray_to_base64(array: np.ndarray) -> str:
5131 try:
5132 """Convert a NumPy array to a base64-encoded string."""
5133 buffer = io.BytesIO()
5134 np.save(buffer, array)
5135 buffer.seek(0)
5136 base64_str = base64.b64encode(buffer.read()).decode("utf-8")
5137 except Exception as e:
5138 print(e)
5139 return ""
5140 return base64_str
5141
5142
5143
5144---
5145File: /services/validator_backend/scoring/utils.py
5146---
5147
5148from .datatypes import (
5149 MinerResponse,
5150 GroundTruthRequest,
5151 BatchedScoringRequest,
5152 ndarray_to_base64,
5153)
5154import torch
5155from transformers import AutoModelForCausalLM, AutoTokenizer, DynamicCache
5156
5157
5158def unit_test(self):
5159 """
5160 Runs a basic unit test to verify the setup and scoring functions for a sample request.
5161 """
5162 try:
5163 data = {
5164 "context": "<s> [INST] Provided the context: French senior civil servant arrested on suspicion of spying for North Korea\n\nNovember 27, 2018 by Joseph Fitsanakis\n\nA senior civil servant in the upper house of the French parliament has been arrested on suspicion of spying for North Korea, according to prosecutors. The news of the suspected spy\u2019s arrest was first reported on Monday by Quotidien, a daily politics and culture show on the Monaco-based television channel TMC. The show cited \u201ca judicial source in Paris\u201d and said that France\u2019s domestic security and counterintelligence agency, the General Directorate for Internal Security (DGSI), was in charge of the espionage case.\n\nThe senior administrator has been identified as Benoit Quennedey, a civil servant who liaises between the French Senate and the Department of Architecture and Heritage, which operates under France\u2019s Ministry of Culture. Quennedey was reportedly detained on Sunday morning and his office in the French Senate was raided by DGSI officers on the same day. Quotidien said that he was arrested on suspicion of \u201ccollecting and delivering to a foreign power information likely to subvert core national interests\u201d. The report did not provide specific information about the type of information that Quennedey is believed to have passed to North Korea. It did state, however, that a counterintelligence investigation into his activities began in March of this year.\n\nQuennedey is believed to be the president of the Franco-Korean Friendship Association, the French branch of a Spanish-based organization that lobbies in favor of international support for North Korea. Korea Friendship Association branches exist in over 30 countries and are believed to be officially sanctioned by Pyongyang. They operate as something akin to the pre-World War II Comintern (Communist International), a Moscow-sanctioned international pressure group that advocated in favor of Soviet-style communism around the world. French media reported on Monday that Quennedey traveled extensively to the Korean Peninsula in the past decade and has written a French-language book on North Korea. News reports said that the French President Emmanuel Macron had been made aware of Quennedey\u2019s arrest. The senior civil servant faces up to 30 years in prison if found guilty of espionage.\n\n\u25ba Author: Joseph Fitsanakis | Date: 27 November 2018 | Permalink\n\n",
5165 "activation_prompt": "Identify the person arrested on suspicion of spying for North Korea. [/INST]",
5166 "expected_completion": "Benoit Quennedey",
5167 }
5168 criterias = ["perplexity"]
5169
5170 context_ids = self.tokenizer(
5171 data["context"],
5172 return_tensors="pt",
5173 truncation=False,
5174 padding=False,
5175 add_special_tokens=False,
5176 )["input_ids"].to(self.device)
5177
5178 context_embeds = self.model.get_input_embeddings()(context_ids).squeeze(0)
5179 compressed_tokens = context_embeds.detach().cpu().numpy().tolist()
5180 compressed_tokens_b64 = ndarray_to_base64(compressed_tokens)
5181 miner_response = MinerResponse(compressed_tokens_b64=compressed_tokens_b64)
5182 ground_truth_request = GroundTruthRequest(
5183 context=data["context"],
5184 activation_prompt=data["activation_prompt"],
5185 expected_completion=data["expected_completion"],
5186 criterias=criterias,
5187 )
5188 request = BatchedScoringRequest(
5189 miner_responses=[miner_response],
5190 ground_truth_request=ground_truth_request,
5191 )
5192 self.get_metrics(request)
5193
5194 ground_truth_request.activation_prompt = (
5195 "Write exactly the same context as provided. [/INST]"
5196 )
5197 ground_truth_request.expected_completion = data["context"]
5198
5199 request = BatchedScoringRequest(
5200 miner_responses=[miner_response],
5201 ground_truth_request=ground_truth_request,
5202 )
5203 self.get_metrics(request)
5204 except Exception as e:
5205 print(f"Error in unit_test: {e}")
5206
5207
5208def generate_answer(
5209 model: AutoModelForCausalLM,
5210 tokenizer: AutoTokenizer,
5211 question_ids: torch.Tensor,
5212 cache: DynamicCache,
5213 context_length: int,
5214 max_new_tokens: int,
5215) -> str:
5216 """
5217 Generate an answer to a question using greedy decoding.
5218
5219 Parameters
5220 ----------
5221 question_ids : torch.Tensor
5222 The tokenized question.
5223 cache : Cache
5224 The compressed key-value cache.
5225 context_length : int
5226 The length of the context.
5227 max_new_tokens : int
5228 The maximum number of new tokens to generate.
5229
5230 Returns
5231 -------
5232 str
5233 The generated answer.
5234 """
5235
5236 cache_seq_lengths = [
5237 cache.get_seq_length(layer_idx) for layer_idx in range(len(cache))
5238 ]
5239 position_ids = torch.arange(
5240 context_length, context_length + question_ids.shape[1], device=model.device
5241 ).unsqueeze(0)
5242
5243 # if the user doesn't provide a question, skip forward pass
5244 outputs = model(
5245 input_ids=question_ids.to(model.device),
5246 past_key_values=cache,
5247 position_ids=position_ids,
5248 num_logits_to_keep=1,
5249 )
5250
5251 position_ids = position_ids[:, -1:] + 1
5252 generated_ids = [outputs.logits[0, -1].argmax()]
5253
5254 should_stop_token_ids = model.generation_config.eos_token_id
5255 if not isinstance(should_stop_token_ids, list):
5256 should_stop_token_ids = [should_stop_token_ids]
5257
5258 for i in range(max_new_tokens - 1):
5259 outputs = model(
5260 input_ids=generated_ids[-1].unsqueeze(0).unsqueeze(0),
5261 past_key_values=cache,
5262 position_ids=position_ids + i,
5263 )
5264 new_id = outputs.logits[0, -1].argmax()
5265 generated_ids.append(new_id)
5266 if new_id.item() in should_stop_token_ids:
5267 break
5268 answer = tokenizer.decode(torch.stack(generated_ids), skip_special_tokens=True)
5269
5270 key_attr, value_attr = "key_cache", "value_cache"
5271
5272 setattr(
5273 cache,
5274 key_attr,
5275 [key[:, :, :c] for key, c in zip(getattr(cache, key_attr), cache_seq_lengths)],
5276 )
5277 setattr(
5278 cache,
5279 value_attr,
5280 [
5281 value[:, :, :c]
5282 for value, c in zip(getattr(cache, value_attr), cache_seq_lengths)
5283 ],
5284 )
5285
5286 return answer
5287
5288
5289
5290---
5291File: /services/validator_backend/universal_scoring/app.py
5292---
5293
5294from fastapi import FastAPI
5295import numpy as np
5296from together import Together
5297from typing import List
5298import logging
5299from pydantic import BaseModel
5300from neural_condense_core import logger
5301
5302
5303class TaskData(BaseModel):
5304 formatted_context: str = ""
5305 original_context: str = ""
5306 challenge_questions: List[str] = []
5307 challenge_answers: List[str] = []
5308 formatted_questions: List[str] = []
5309 negative_chunks: List[str] = []
5310 positive_chunks: List[str] = []
5311
5312
5313class UtilData(BaseModel):
5314 compressed_kv_b64: str = ""
5315 compressed_length: int = 0
5316 download_time: float = 0.0
5317 bonus_compress_size: float = 0.0
5318 bonus_time: float = 0.0
5319 local_filename: str = ""
5320
5321
5322class MinerResponse(BaseModel):
5323 filename: str = ""
5324 compressed_context: str = ""
5325
5326
5327class BatchedScoringRequest(BaseModel):
5328 miner_responses: List[MinerResponse] = []
5329 task_data: TaskData = TaskData()
5330 target_model: str = ""
5331 criterias: List[str] = []
5332
5333
5334# logger = logging.getLogger("uvicorn")
5335logger.info("This will show in Universal Validator Backend logs")
5336
5337app = FastAPI()
5338
5339openai_client = Together()
5340
5341
5342@app.post("/get_metrics")
5343async def get_metrics(item: BatchedScoringRequest):
5344 logger.info(f"Received scoring request for model: {item.target_model}")
5345
5346 model = item.target_model
5347 if "Llama-3.1-8B-Instruct" in model:
5348 model = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo-128K"
5349
5350 compressed_contexts = [
5351 item.miner_responses[i].compressed_context
5352 for i in range(len(item.miner_responses))
5353 ]
5354 questions = item.task_data.challenge_questions
5355 ground_truths = item.task_data.challenge_answers
5356 negative_chunks = item.task_data.negative_chunks
5357 positive_chunks = item.task_data.positive_chunks
5358
5359 logger.info(f"Processing {len(compressed_contexts)} miner responses")
5360 logger.info(f"Number of questions: {len(questions)}")
5361 logger.info(f"Number of positive chunks: {len(positive_chunks)}")
5362 logger.info(f"Number of negative chunks: {len(negative_chunks)}")
5363 logger.info(f"Number of compressed contexts: {len(compressed_contexts)}")
5364 existence_scores = []
5365 for i, compressed_context in enumerate(compressed_contexts):
5366 logger.info(
5367 f"Getting existence score for response {i+1}/{len(compressed_contexts)}"
5368 )
5369 score = await get_chunk_existence_score(
5370 compressed_context, positive_chunks, negative_chunks, model
5371 )
5372 logger.info(f"Raw existence score: {score}, type: {type(score)}")
5373 if score is None or np.isnan(score):
5374 logger.error(f"Invalid existence score for response {i+1}")
5375 score = 0.0
5376 existence_scores.append(score)
5377 logger.info(f"Existence score for response {i+1}: {score}")
5378
5379 qa_scores = []
5380 for i, compressed_context in enumerate(compressed_contexts):
5381 logger.info(f"Getting QA score for response {i+1}/{len(compressed_contexts)}")
5382 score = await get_qa_score(compressed_context, questions, ground_truths, model)
5383 logger.info(f"Raw QA score: {score}, type: {type(score)}")
5384 if score is None or np.isnan(score):
5385 logger.error(f"Invalid QA score for response {i+1}")
5386 score = 0.0
5387 qa_scores.append(score)
5388 logger.info(f"QA score for response {i+1}: {score}")
5389
5390 final_scores = []
5391 for i, (existence_score, qa_score) in enumerate(zip(existence_scores, qa_scores)):
5392 logger.info(f"Calculating final score for response {i+1}")
5393 logger.info(f"Using existence_score={existence_score}, qa_score={qa_score}")
5394 if (
5395 existence_score is None
5396 or qa_score is None
5397 or np.isnan(existence_score)
5398 or np.isnan(qa_score)
5399 ):
5400 logger.error(f"Invalid scores for response {i+1}")
5401 final_score = 0.0
5402 else:
5403 final_score = (existence_score + qa_score) / 2
5404 final_scores.append(final_score)
5405 logger.info(f"Final score for response {i+1}: {final_score}")
5406
5407 avg_score = np.mean(final_scores) if final_scores else 0.0
5408 logger.info(f"Completed scoring. Raw scores: {final_scores}")
5409 logger.info(f"Completed scoring. Average final score: {avg_score:.4f}")
5410 return {"metrics": {"accuracy": final_scores}}
5411
5412
5413async def get_qa_score(
5414 compressed_context: str,
5415 questions: List[str],
5416 ground_truths: List[str],
5417 model: str,
5418):
5419 logger.info("Starting QA scoring")
5420 prompt = """
5421You are given a context and a question. Your task is to answer the question based on the context.
5422---CONTEXT---
5423{compressed_context}
5424---QUESTION---
5425{question}
5426---END---
5427Your response should be concise and to the point. Skip any greetings or other unrelevant information.
5428"""
5429 answers = []
5430
5431 for i, question in enumerate(questions):
5432 logger.info(f"Processing question {i+1}/{len(questions)}")
5433 messages = [
5434 {
5435 "role": "user",
5436 "content": prompt.format(
5437 compressed_context=compressed_context, question=question
5438 ),
5439 },
5440 ]
5441 try:
5442 response = openai_client.chat.completions.create(
5443 model=model, messages=messages, temperature=0
5444 )
5445 text = response.choices[0].message.content
5446 answers.append(text)
5447 except Exception as e:
5448 logger.error(f"Error getting answer for question {i+1}: {str(e)}")
5449 raise
5450
5451 judge_prompt = """
5452You are given a set of question, answer, and ground truth. Your task is to determine whether the answer is correct.
5453---QUESTION---
5454{question}
5455---ANSWER---
5456{answer}
5457---GROUND TRUTH---
5458{ground_truth}
5459---END---
5460You only need to output one word: either 'yes' or 'no'. No additional text or explanations are required.
5461An answer is correct if it is mentioned the important points in the ground truth.
5462"""
5463 scores = []
5464 for i, (question, answer, ground_truth) in enumerate(
5465 zip(questions, answers, ground_truths)
5466 ):
5467 logger.info(f"Judging answer {i+1}/{len(questions)}")
5468 messages = [
5469 {
5470 "role": "user",
5471 "content": judge_prompt.format(
5472 question=question, answer=answer, ground_truth=ground_truth
5473 ),
5474 },
5475 ]
5476 try:
5477 response = openai_client.chat.completions.create(
5478 model=model, messages=messages, temperature=0
5479 )
5480 text = response.choices[0].message.content
5481 text = text.strip().lower()
5482 words = text.split()
5483 result = "yes" in words and not ("no" in words or "not" in words)
5484 scores.append(result)
5485 logger.info(f"Answer {i+1} scored: {result}")
5486 except Exception as e:
5487 logger.error(f"Error judging answer {i+1}: {str(e)}")
5488 raise
5489
5490 if not scores:
5491 logger.warning("No valid scores generated in QA scoring")
5492 return 0.0
5493
5494 score = np.mean(scores)
5495 logger.info(f"Final QA score: {score:.4f}")
5496 return score
5497
5498
5499async def get_chunk_existence_score(
5500 compressed_context: str,
5501 positive_chunks: List[str],
5502 negative_chunks: List[str],
5503 model: str,
5504):
5505 logger.info("Starting chunk existence scoring")
5506 prompt = """
5507You are given a context and a chunk of text. Your task is to determine whether the chunk content is mentioned in the context.
5508---CONTEXT---
5509{compressed_context}
5510---CHUNK---
5511{chunk}
5512---END---
5513Your response should contain exactly one word: either 'yes' or 'no'. No additional text or explanations are required.
5514"""
5515 positive_scores = []
5516 negative_scores = []
5517
5518 for i, chunk in enumerate(positive_chunks):
5519 logger.info(f"Processing positive chunk {i+1}/{len(positive_chunks)}")
5520 messages = [
5521 {
5522 "role": "user",
5523 "content": prompt.format(
5524 compressed_context=compressed_context, chunk=chunk
5525 ),
5526 },
5527 ]
5528 try:
5529 response = openai_client.chat.completions.create(
5530 model=model, messages=messages, temperature=0
5531 )
5532 text = response.choices[0].message.content
5533 text = text.strip().lower()
5534 words = text.split()
5535 result = "yes" in words and not ("no" in words or "not" in words)
5536 positive_scores.append(result)
5537 logger.info(f"Positive chunk {i+1} scored: {result}")
5538 except Exception as e:
5539 logger.error(f"Error processing positive chunk {i+1}: {str(e)}")
5540 raise
5541
5542 for i, chunk in enumerate(negative_chunks):
5543 logger.info(f"Processing negative chunk {i+1}/{len(negative_chunks)}")
5544 messages = [
5545 {
5546 "role": "user",
5547 "content": prompt.format(
5548 compressed_context=compressed_context, chunk=chunk
5549 ),
5550 },
5551 ]
5552 try:
5553 response = openai_client.chat.completions.create(
5554 model=model, messages=messages, temperature=0
5555 )
5556 text = response.choices[0].message.content
5557 text = text.strip().lower()
5558 words = text.split()
5559 result = ("no" in words or "not" in words) and "yes" not in words
5560 negative_scores.append(result)
5561 logger.info(f"Negative chunk {i+1} scored: {result}")
5562 except Exception as e:
5563 logger.error(f"Error processing negative chunk {i+1}: {str(e)}")
5564 raise
5565
5566 if not positive_scores and not negative_scores:
5567 logger.warning("No valid scores generated in chunk existence scoring")
5568 return 0.0
5569
5570 score = np.mean(positive_scores + negative_scores)
5571 logger.info(f"Final existence score: {score:.4f}")
5572 return score
5573
5574
5575
5576---
5577File: /tests/test_miner_backend.py
5578---
5579
5580import requests
5581import argparse
5582from neural_condense_core.common.base64 import base64_to_ndarray
5583
5584# Default values for the base URL and port
5585DEFAULT_HOST = "localhost"
5586DEFAULT_PORT = 8080
5587DEFAULT_API_PATH = "/condense"
5588
5589
5590def get_args():
5591 """
5592 Function to parse command-line arguments for test configuration.
5593 """
5594 parser = argparse.ArgumentParser(description="Test API Endpoints.")
5595 parser.add_argument(
5596 "--host", type=str, default=DEFAULT_HOST, help="API host (default: localhost)"
5597 )
5598 parser.add_argument(
5599 "--port", type=int, default=DEFAULT_PORT, help="API port (default: 8080)"
5600 )
5601 parser.add_argument(
5602 "--api-path",
5603 type=str,
5604 default=DEFAULT_API_PATH,
5605 help="API path (default: /condense)",
5606 )
5607
5608 parser.add_argument(
5609 "--target-model",
5610 type=str,
5611 default="Condense-AI/Mistral-7B-Instruct-v0.1",
5612 )
5613
5614 args, _ = parser.parse_known_args() # Avoid conflict with pytest's arguments
5615 return args
5616
5617
5618# Get arguments from the command line
5619args = get_args()
5620
5621# Construct the base URL using the provided arguments
5622BASE_URL = f"http://{args.host}:{args.port}{args.api_path}"
5623
5624
5625def get_api_url():
5626 """
5627 Function to provide the full API URL based on the host, port, and path.
5628 """
5629 return BASE_URL
5630
5631
5632def test_miner_api():
5633 """
5634 Test the prediction endpoint by sending a valid context and model request.
5635 """
5636 api_url = get_api_url()
5637
5638 payload = {
5639 "context": "This is a long test context that needs to be compressed.",
5640 "target_model": args.target_model,
5641 }
5642
5643 response = requests.post(api_url, json=payload)
5644
5645 if response.status_code != 200:
5646 raise Exception(f"Expected status code 200 but got {response.status_code}")
5647
5648 data = response.json()
5649
5650 if "compressed_tokens_base64" not in data:
5651 raise Exception("Response should contain compressed_tokens_base64.")
5652
5653 compressed_tokens = base64_to_ndarray(data["compressed_tokens_base64"])
5654
5655 seq_len, hidden_size = compressed_tokens.shape
5656
5657 print(f"Compressed tokens shape: {seq_len} x {hidden_size}")
5658
5659 print("API test passed successfully!")
5660
5661
5662
5663---
5664File: /tests/test_synthesizer.py
5665---
5666
5667from transformers import AutoTokenizer
5668from neural_condense_core.validator_utils.synthesizing import ChallengeGenerator
5669import json
5670import os
5671
5672os.makedirs("tmp", exist_ok=True)
5673
5674tokenizer = AutoTokenizer.from_pretrained("unsloth/Mistral-7B-Instruct-v0.2")
5675
5676tasks = [
5677 "question_answering",
5678 "causal_conversation",
5679 "reconstruct_conversation",
5680 "trivial_qa_conversation",
5681]
5682
5683challenge_generator = ChallengeGenerator()
5684
5685for task in tasks:
5686 challenge = challenge_generator.generate_challenge(tokenizer, task=task)
5687 json.dump(challenge.deserialize(), open(f"tmp/challenge_{task}.json", "w"))
5688
5689
5690
5691---
5692File: /tests/test_synthetic.py
5693---
5694
5695import time
5696import json
5697import numpy as np
5698from tqdm import tqdm
5699from transformers import AutoTokenizer
5700from neural_condense_core import validator_utils
5701import traceback
5702import asyncio
5703
5704
5705def benchmark_challenger(
5706 n_iterations: int = 10,
5707 max_characters: int = 10000,
5708 model_name: str = "Condense-AI/Mistral-7B-Instruct-v0.2",
5709):
5710 """
5711 Benchmark the Challenger model's response times and dataset creation for various tasks.
5712
5713 Args:
5714 n_iterations (int): Number of iterations per task to perform. Defaults to 5000.
5715 max_characters (int): Maximum character limit for context in each task. Defaults to 10000.
5716 model_name (str): The name of the model to use for tokenization. Defaults to "Condense-AI/Mistral-7B-Instruct-v0.2".
5717
5718 Returns:
5719 dict: Summary of benchmark results including average time per task and statistics on context length.
5720 """
5721 # Load tokenizer and initialize Challenger instance
5722 if model_name == "universal":
5723 tokenizer = AutoTokenizer.from_pretrained("gpt2")
5724 else:
5725 tokenizer = AutoTokenizer.from_pretrained(model_name)
5726 challenger = validator_utils.synthesizing.ChallengeGenerator(None)
5727
5728 # Define task types and initialize logs
5729 tasks = [
5730 "question_answering",
5731 # "causal_conversation",
5732 # "reconstruct_conversation",
5733 # "trivial_qa_conversation",
5734 ]
5735 time_logs = {task: 0 for task in tasks}
5736 error_count = 0
5737 dataset_items = []
5738 context_lengths = []
5739 token_counts = []
5740
5741 # Start progress bar for total iterations
5742 total_iterations = n_iterations * len(tasks)
5743 pbar = tqdm(total=total_iterations, desc="Benchmarking", unit="task")
5744
5745 for i in range(n_iterations):
5746 for task in tasks:
5747 try:
5748 start_time = time.time()
5749
5750 # Generate protocol using Challenger
5751 protocol = asyncio.run(
5752 challenger.generate_challenge(
5753 model_name=model_name,
5754 task=task,
5755 max_context_length_in_chars=max_characters,
5756 )
5757 )
5758
5759 # Record details of the generated sample
5760 item = {
5761 "task": task,
5762 "id": i,
5763 "data": protocol.validator_payload,
5764 "model_id": model_name,
5765 "max_characters": max_characters,
5766 }
5767
5768 # Track time taken for task
5769 time_logs[task] += time.time() - start_time
5770
5771 # Store context length and token count for analysis
5772 context_lengths.append(
5773 len(item["data"]["task_data"]["formatted_context"])
5774 )
5775 tokens = tokenizer.encode(
5776 item["data"]["task_data"]["formatted_context"]
5777 )
5778 token_counts.append(len(tokens))
5779
5780 # Add item to dataset items
5781 dataset_items.append(item)
5782
5783 except Exception as e:
5784 traceback.print_exc()
5785 print(f"Error during task '{task}' at iteration {i}: {e}")
5786 error_count += 1
5787 continue
5788
5789 # Update progress bar
5790 pbar.update(1)
5791
5792 # Close progress bar
5793 pbar.close()
5794
5795 # Calculate average processing time per task
5796 avg_time_logs = {
5797 task: total_time / n_iterations for task, total_time in time_logs.items()
5798 }
5799 error_rate = error_count / total_iterations
5800
5801 # Display benchmark summary
5802 print("\nBenchmark Summary:")
5803 print(f"Error count: {error_count}")
5804 print(f"Error rate: {error_rate:.2%}")
5805 print("Average processing times (seconds):", avg_time_logs)
5806
5807 # Analyze context lengths and tokens
5808 context_lengths = np.array(context_lengths)
5809 token_counts = np.array(token_counts)
5810 mean_length = context_lengths.mean()
5811 std_length = context_lengths.std()
5812 mean_tokens = token_counts.mean()
5813 std_tokens = token_counts.std()
5814
5815 print("\nContext length statistics:")
5816 print(f"Mean: {mean_length:.2f} characters")
5817 print(f"Standard Deviation: {std_length:.2f} characters")
5818 print("\nToken count statistics:")
5819 print(f"Mean: {mean_tokens:.2f} tokens")
5820 print(f"Standard Deviation: {std_tokens:.2f} tokens")
5821
5822 # Save dataset items to JSON file
5823 with open("synthetic_samples.json", "w") as file:
5824 json.dump(dataset_items, file)
5825
5826 # Return expanded summary of results
5827 return {
5828 "error_count": error_count,
5829 "error_rate": error_rate,
5830 "avg_time_per_task": avg_time_logs,
5831 "context_length_mean": mean_length,
5832 "context_length_std": std_length,
5833 "token_count_mean": mean_tokens,
5834 "token_count_std": std_tokens,
5835 }
5836
5837
5838# Run benchmark
5839benchmark_results = benchmark_challenger(
5840 n_iterations=100,
5841 max_characters=40000,
5842 model_name="universal",
5843)
5844
5845print("\nBenchmarking completed. Results:", benchmark_results)
5846
5847
5848
5849---
5850File: /tests/upload_synthetic_dataset.py
5851---
5852
5853from datasets import Dataset
5854import pandas as pd
5855import json
5856
5857
5858data = json.load(open("synthetic_samples.json"))
5859
5860dataset = Dataset.from_pandas(pd.DataFrame(data))
5861
5862dataset.push_to_hub("Condense-AI/synthetic-samples-v0.1")
5863
5864
5865
5866---
5867File: /auto_update.sh
5868---
5869
5870#!/bin/bash
5871
5872# Function to update repository
5873update_repo() {
5874 echo "Checking for updates..."
5875
5876 # Fetch latest changes without merging
5877 git fetch origin
5878
5879 # Get current and remote hash
5880 LOCAL_HASH=$(git rev-parse HEAD)
5881 REMOTE_HASH=$(git rev-parse origin/main)
5882
5883 if [ "$LOCAL_HASH" != "$REMOTE_HASH" ]; then
5884 echo "Updates available. Updating repository..."
5885
5886 # Stash any local changes
5887 git stash
5888 git checkout main
5889 # Pull latest changes
5890 git pull origin main
5891 git reset --hard origin/main
5892
5893 # Reinstall dependencies
5894 uv sync --prerelease=allow
5895
5896 pm2 restart condense_validator_backend
5897 pm2 restart condense_validator
5898
5899 # check if condense_validator is errored, if yes, restart
5900 if [ "$(pm2 jlist | jq '.[] | select(.name=="condense_validator") | .pm2_env.status' -r)" == "errored" ]; then
5901 echo "condense_validator is in error state, restarting..."
5902 pm2 restart condense_validator
5903 fi
5904
5905 echo "Update completed successfully!"
5906 else
5907 echo "Repository is up to date!"
5908 fi
5909}
5910
5911# Run the update function
5912while true; do
5913 update_repo
5914 sleep 1800 # Sleep for 30 minutes (1800 seconds)
5915done
5916
5917
5918
5919---
5920File: /README.md
5921---
5922
5923<div align="center">
5924<picture>
5925 <source srcset="./assets/images/condense-main.png">
5926 <img src="./assets/images/condense-main.png" alt="Neural Condense Subnet">
5927
5928</picture>
5929</div>
5930
5931<div align="center">
5932
5933<pre>
5934
5935 β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•—
5936β–ˆβ–ˆβ•”β•β•β•β•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•”β•β•β•β•β•β–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β•β•β•β•β•β–ˆβ–ˆβ•”β•β•β•β•β• β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•‘
5937β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β–ˆβ–ˆβ•— β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•”β–ˆβ–ˆβ•— β–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘
5938β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β•šβ–ˆβ–ˆβ•—β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β•β•β• β–ˆβ–ˆβ•‘β•šβ–ˆβ–ˆβ•—β–ˆβ–ˆβ•‘β•šβ•β•β•β•β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β•β•β• β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘
5939β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ•‘ β•šβ–ˆβ–ˆβ–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ•‘ β•šβ–ˆβ–ˆβ–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘
5940 β•šβ•β•β•β•β•β• β•šβ•β•β•β•β•β• β•šβ•β• β•šβ•β•β•β•β•šβ•β•β•β•β•β• β•šβ•β•β•β•β•β•β•β•šβ•β• β•šβ•β•β•β•β•šβ•β•β•β•β•β•β•β•šβ•β•β•β•β•β•β• β•šβ•β• β•šβ•β•β•šβ•β•
5941
5942</pre>
5943
5944</div>
5945
5946<div align="center">
5947
5948<h2>πŸ’‘ Explore Our Ecosystem πŸ’‘</h2>
5949
5950| Component | Link |
5951|------------------------------------------|-------------------------------------------------------------------|
5952| 🌐 **Condense-AI & API Document** | [Visit Condense-AI](https://condenses.ai) |
5953| πŸ“š **API Library** | [Explore API Library](https://github.com/condenses/neural-condense) |
5954| πŸ”— **Organic Forwarder For Validators** | [Check Organic Forwarder](https://github.com/condenses/subnet-organic) |
5955| πŸ“Š **Miner Leaderboard & Statistics** | [View Miner Dashboard](https://dashboard.condenses.ai) or [Wandb Logger](https://wandb.ai/toilaluan/Neural-Condense-Subnet) |
5956
5957</div>
5958
5959---
5960
5961## Changelogs
5962- (25/11/2024) Version 0.0.2 Update: Added condensing activations layers, Switched to Distributed Storage from Restful API Transfer, Emissions now allocated only to the top 30% miners.
5963
5964
5965## 🌟 Key Features:
5966
5967### ⚑ Subnet as an Accelerate Adapter for LLM Inference
5968- 🌐 **Seamless Integration**: Effortlessly integrates with LLM inference engines, such as transformers πŸ€—, vllm.
5969- 🧩 **Token Compression**: The subnet API compresses long sequences of natural language tokens into soft tokens.
5970- πŸ›οΈ **Decentralized Network**: The subnet is a decentralized network that allows miners to contribute to the compression process.
5971- πŸ“Š **Tiered System**: The subnet has a tiered system, with a research tier for experimentation and an inference tier for production-scale use. Incentive distribution is splitted for each tier.
5972- πŸ“ **Benchmarking and Validation**: The subnet owner defines synthetic metrics to benchmark miners’ performance, ensuring quality and efficiency.
5973
5974<div align="center">
5975<img src="https://github.com/user-attachments/assets/87060854-57bd-4b9b-9b06-b1edf87d182a" alt="condense" width="75%">
5976</div>
5977
5978### βš™οΈ Node Tiers
5979
5980
5981| **Tier** | **Purpose** | **Context Size** | **Incentive Percentage** | **Supporting Models** |
5982|----------------|---------------------------------------|---------------------------|---------------|--------------------------------------|
5983| `research` | Optimize text-to-kv cache for a specific model | Up to 15000 characters | `60%` | `mistralai/Mistral-7B-Instruct-v0.2` |
5984| `universal` | Compress text representation for various models | Up to 15000 characters | `40%` | `meta-llama/Llama-3.1-8B-Instruct` |
5985
5986
5987*Supporting models can be flexibly added based on tailored need.*
5988
5989<div align="center">
5990<img src="https://github.com/user-attachments/assets/b661ed8e-fc8a-45e3-ad78-6001dae93b21" alt="realese-circle" width="75%">
5991</div>
5992
5993
5994## πŸ“š Documentation
5995- **Setup for miners**: [Miner Setup](./docs/miner.md)
5996- **Setup for validators**: [Validator Setup](./docs/validator.md)
5997- **Mechanism**: [Mechanism](./docs/mechanism.md)
5998