js怎么获取别人的ip地址吗 2025-06-14 05:58:49
巴西队终于复仇了德国,可惜“足球爷爷”看不到了 2025-06-12 01:44:39
wow牧师名字(精选600个) 2025-06-29 02:27:28
​好看的盗墓电影 盗墓必看十部电影分享 2025-05-15 07:55:10
一的成语 2025-05-09 04:34:19
2025年JS/AJAX十大网站排行榜 2025-06-25 04:21:57
史上超全法师BB攻略 | 组建自己的BB天团!· 传奇永恒 2025-05-29 19:30:01
如何用数学课件制作工具画球体 2025-05-18 19:30:58
LPL4月17日QGvsEDG QG为什么直接输掉比赛 2025-06-30 15:19:58
电子邮件查询 2025-06-23 15:37:09

每天40分玩转Django:Django性能优化

Django性能优化

一、性能优化要点总览表

优化类别具体措施预期效果数据库优化索引优化、查询优化、N+1问题提升查询速度、减少数据库负载缓存优化Redis缓存、页面缓存、查询缓存减少数据库访问、提升响应速度异步处理Celery任务队列、异步视图、异步ORM提升并发性能、优化用户体验

二、数据库优化实现

1. 模型设计和索引优化

# models.py

from django.db import models

from django.db.models import Index

class Category(models.Model):

name = models.CharField(max_length=100)

slug = models.SlugField(unique=True)

class Meta:

indexes = [

models.Index(fields=['name']),

models.Index(fields=['slug'])

]

class Article(models.Model):

title = models.CharField(max_length=200)

content = models.TextField()

category = models.ForeignKey(Category, on_delete=models.CASCADE)

author = models.ForeignKey('auth.User', on_delete=models.CASCADE)

created_at = models.DateTimeField(auto_now_add=True)

updated_at = models.DateTimeField(auto_now=True)

views = models.IntegerField(default=0)

class Meta:

indexes = [

models.Index(fields=['created_at', '-views']),

models.Index(fields=['category', 'author']),

]

2. 查询优化

# views.py

from django.db.models import Prefetch, Count, Q

from django.views.generic import ListView

class ArticleListView(ListView):

model = Article

template_name = 'articles/article_list.html'

context_object_name = 'articles'

def get_queryset(self):

# 使用select_related减少外键查询

queryset = Article.objects.select_related(

'category', 'author'

).prefetch_related(

'tags',

Prefetch(

'comments',

queryset=Comment.objects.select_related('user')

)

)

# 添加过滤条件

category = self.request.GET.get('category')

if category:

queryset = queryset.filter(category__slug=category)

# 添加聚合查询

queryset = queryset.annotate(

comment_count=Count('comments'),

like_count=Count('likes')

)

return queryset.order_by('-created_at')

3. 批量操作优化

# utils.py

from django.db import transaction

def bulk_create_articles(articles_data):

"""批量创建文章"""

with transaction.atomic():

articles = [

Article(

title=data['title'],

content=data['content'],

category_id=data['category_id'],

author_id=data['author_id']

)

for data in articles_data

]

return Article.objects.bulk_create(articles)

def bulk_update_views(article_ids):

"""批量更新文章浏览量"""

with transaction.atomic():

Article.objects.filter(id__in=article_ids).update(

views=models.F('views') + 1

)

三、缓存优化实现

1. Redis缓存配置

# settings.py

CACHES = {

'default': {

'BACKEND': 'django_redis.cache.RedisCache',

'LOCATION': 'redis://127.0.0.1:6379/1',

'OPTIONS': {

'CLIENT_CLASS': 'django_redis.client.DefaultClient',

}

}

}

# 使用Redis作为Session后端

SESSION_ENGINE = 'django.contrib.sessions.backends.cache'

SESSION_CACHE_ALIAS = 'default'

2. 视图缓存实现

# views.py

from django.views.decorators.cache import cache_page

from django.utils.decorators import method_decorator

from django.core.cache import cache

from django.conf import settings

@method_decorator(cache_page(60 * 15), name='dispatch')

class CategoryListView(ListView):

model = Category

template_name = 'categories/category_list.html'

class ArticleDetailView(DetailView):

model = Article

def get_object(self):

article_id = self.kwargs['pk']

cache_key = f'article_{article_id}'

# 尝试从缓存获取

article = cache.get(cache_key)

if article is None:

# 缓存未命中,从数据库获取

article = super().get_object()

# 存入缓存

cache.set(cache_key, article, timeout=60*30)

return article

def get_popular_articles():

cache_key = 'popular_articles'

articles = cache.get(cache_key)

if articles is None:

articles = Article.objects.annotate(

total_score=Count('likes') + Count('comments')

).order_by('-total_score')[:10]

cache.set(cache_key, articles, timeout=60*60)

return articles

3. 缓存装饰器

# decorators.py

from functools import wraps

from django.core.cache import cache

def cache_result(timeout=300):

def decorator(func):

@wraps(func)

def wrapper(*args, **kwargs):

# 生成缓存键

cache_key = f"{func.__name__}:{args}:{kwargs}"

result = cache.get(cache_key)

if result is None:

result = func(*args, **kwargs)

cache.set(cache_key, result, timeout=timeout)

return result

return wrapper

return decorator

# 使用示例

@cache_result(timeout=60*5)

def get_article_stats(article_id):

return {

'views': Article.objects.get(id=article_id).views,

'comments': Comment.objects.filter(article_id=article_id).count(),

'likes': Like.objects.filter(article_id=article_id).count()

}

四、异步处理实现

1. Celery配置和任务

# celery.py

from celery import Celery

import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

# tasks.py

from celery import shared_task

from django.core.mail import send_mail

from .models import Article

@shared_task

def send_article_notification(article_id):

article = Article.objects.get(id=article_id)

subscribers = article.category.subscribers.all()

for subscriber in subscribers:

send_mail(

f'新文章: {article.title}',

f'查看最新文章:{article.get_absolute_url()}',

'noreply@example.com',

[subscriber.email],

fail_silently=False,

)

@shared_task

def update_article_stats():

"""定期更新文章统计信息"""

for article in Article.objects.all():

stats = get_article_stats(article.id)

article.stats_cache = stats

article.save()

2. 异步视图实现

# views.py

from django.http import JsonResponse

from asgiref.sync import sync_to_async

from channels.layers import get_channel_layer

from asgiref.sync import async_to_sync

async def async_article_detail(request, pk):

article = await sync_to_async(Article.objects.get)(id=pk)

context = {

'title': article.title,

'content': article.content,

'author': article.author.username

}

return JsonResponse(context)

class CommentCreateView(View):

def post(self, request, article_id):

comment = Comment.objects.create(

article_id=article_id,

user=request.user,

content=request.POST['content']

)

# 异步发送WebSocket通知

channel_layer = get_channel_layer()

async_to_sync(channel_layer.group_send)(

f"article_{article_id}",

{

"type": "comment.notification",

"message": {

"comment_id": comment.id,

"user": comment.user.username,

"content": comment.content

}

}

)

return JsonResponse({'status': 'success'})

3. 异步中间件

# middleware.py

from django.core.cache import cache

from asgiref.sync import sync_to_async

class AsyncCacheMiddleware:

def __init__(self, get_response):

self.get_response = get_response

async def __call__(self, request):

cache_key = f"page_cache:{request.path}"

response = await sync_to_async(cache.get)(cache_key)

if response is None:

response = await self.get_response(request)

await sync_to_async(cache.set)(

cache_key,

response,

timeout=300

)

return response

五、性能监控

1. 查询日志记录

# middleware.py

import time

import logging

from django.db import connection

logger = logging.getLogger(__name__)

class QueryCountMiddleware:

def __init__(self, get_response):

self.get_response = get_response

def __call__(self, request):

start_time = time.time()

initial_queries = len(connection.queries)

response = self.get_response(request)

total_time = time.time() - start_time

total_queries = len(connection.queries) - initial_queries

if total_queries > 10: # 设置查询数量阈值

logger.warning(

f'Path: {request.path} - '

f'Queries: {total_queries} - '

f'Time: {total_time:.2f}s'

)

return response

六、性能优化流程图

最佳实践建议:

数据库优化:

合理使用索引避免N+1查询问题使用批量操作替代循环操作定期分析和优化慢查询 缓存策略:

合理设置缓存时间分层缓存架构及时更新缓存避免缓存雪崩 异步处理:

合理使用任务队列异步处理耗时操作适当使用异步视图监控任务执行状态

这就是关于Django性能优化的详细内容。通过实践这些优化策略,你可以显著提升Django应用的性能。如果有任何问题,欢迎随时提出!

怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!