添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
pip install pika

这里需要注意的是: pika官网明确说明 pika==0.11.0版本只支持python2.6以前的版本
重点: 在下载时可以进入官网确定你的版本所需要的pika版本号。
pika官网地址https://pypi.org/project/pika/
在这里插入图片描述

2,实现rabbitmq基础模块类的编写

这里实现了Rabbitmq对象初始化、连接mq、发送mq消息、阻塞监听消息并回调。
本模块代码作为rabbitmq基础模块类,为业务模块调用提供方法。
对于代码的详解已经写道注释中了。

这里的一个connection就是一个tcp连接。为了提升tcp连接复用性,在每个连接基础上可以建立多个channel信道,每个信道都会被指派一个唯一的 ID。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。但考虑到如果数据量过大,会导致连接阻塞,最终这里选择一个connect连接只对应了一个channel信道。
关于RabbitMQ 中 Connection 和 Channel 详解:https://www.cnblogs.com/eleven24/p/10326718.html
类似于下图:
在这里插入图片描述
模块代码如下:

import pika
import time
import logging
logger = logging.getLogger('mydjango')
# import sys
from django.conf import settings
from retrying import retry
class RabbitmqServer(object):
    def __init__(self,username,password,serverip,port,virtual_host):
        self.username =username
        self.password = password
        self.serverip = serverip
        self.port = port
        self.virtual_host = virtual_host
    # def connent(self):
    #    for i in range(3):
    #         try:
    #             logger.info("into mq connet")
    #             user_pwd = pika.PlainCredentials(self.username,self.password)
    #             logger.info("create mq ...")
    #             logger.info("%s,%s,%s,%s,%s"%(self.virtual_host,self.serverip,self.port,self.password,self.username))
    #             s_conn = pika.BlockingConnection(pika.ConnectionParameters(virtual_host=self.virtual_host,host= self.serverip,port=self.port, credentials=user_pwd))  # 创建连接
    #             logger.info('create channel...')
    #             self.channel = s_conn.channel()
    #             logger.info('connect successful')
    #             break
    #         except Exception as e:
    #             logger.info("连接mq失败,沉睡10s再试,共沉睡三次,失败原因:%s",e)
    #             time.sleep(10)
    @retry(stop_max_delay=30000, wait_fixed=5000)
    def connent(self):
        logger.info("into mq connet")
        user_pwd = pika.PlainCredentials(self.username, self.password)
        logger.info("create mq ...")
        logger.info("%s,%s,%s,%s,%s" % (self.virtual_host, self.serverip, self.port, self.password, self.username))
        # 创建 mq连接
        s_conn = pika.BlockingConnection(
            pika.ConnectionParameters(virtual_host=self.virtual_host, host=self.serverip, port=self.port,
                                      credentials=user_pwd))  
        logger.info('create channel...')
        self.channel = s_conn.channel()
        logger.info('connect successful')
    def productMessage(self,queuename,message):
        self.channel.queue_declare(queue=queuename, durable=True)
        self.channel.basic_publish(exchange='',
                              routing_key=queuename,#写明将消息发送给队列queuename
                              body=message,    #要发送的消息
                              properties=pika.BasicProperties(delivery_mode=2,)#设置消息持久化,将要发送的消息的属性标记为2,表示该消息要持久化
    def expense(self,queuename,func):
        :param queuename: 消息队列名称
        :param func: 要回调的方法名
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
                            func,
                            queue=queuename,
        self.channel.start_consuming()
def callback(ch, method, properties, body):
    print(" [消费者] Received %r" % body)
    time.sleep(1)
    print(" [消费者] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)#  接收到消息后会给rabbitmq发送一个确认
if __name__ != '__main__':   # 测试服务是否能启动时使用
    from django.conf import settings
    # username = settings.RABBITMQCONFIG.get("username")
    # password = settings.RABBITMQCONFIG.get("password")
    # severip = settings.RABBITMQCONFIG.get("severip")
    # port = settings.RABBITMQCONFIG.get("port")
    username,password,severip,port,virtual_host = settings.PONEDITOR_RMQ_USER,settings.PONEDITOR_RMQ_PASSWD,settings.PONEDITOR_RMQ_IP,\
                                                  settings.PONEDITOR_RMQ_PORT,settings.PONEDITOR_RMQ_VIRHOST
    RabbitmqClient = RabbitmqServer(username,password,severip,port,virtual_host)
if __name__ == '__main__':
    import json
    RabbitmqClient = RabbitmqServer("root", "ssb@2019",'172.31.0.54',5673,"YuXinIBTool")
    RabbitmqClient.connent()
    data = {"code":3}
    RabbitmqClient.productMessage("test3",json.dumps(data))
    RabbitmqClient.expense("test3",callback)

二,django中多进程开启Rabbitmq声明队列、发送消息、阻塞监听

此模块中将使用多进程,为方便结束开启的多个进程,这里使用kill -15 方法删掉进程。
kill -9 和kill -15的区别:https://www.cnblogs.com/domestique/p/8241219.html

from django.core.management.base import BaseCommand
from utils.echo_display import zip_unzip
import os
import json
import pymysql
import pandas as pd
import time
from django.conf import settings
# from utils.structure_modify import modify
from utils.annual_entrusted_modify import modify_annual
import signal
import multiprocessing
# import threading
class Command(BaseCommand):
    def handle(self, *args, **options):
        import logging
        logger = logging.getLogger('mydjango')
        from utils.Rabbitmqserver import RabbitmqClient
        from django.conf import settings
        import json
        def parse_result_func(ch, method, properties, body):
            ### 逻辑程序
            logger.info("%s start to Analytical data..." %(queue_name))
            logger.info(" [接收到的请求头] Received %r [接收到的请求体] Received %r" % (properties.headers, body))
            try:
                req_res = json.loads(body)
                req_head = dict(properties.headers)
                project_id = str(req_res["projectId"])
                logger.info("Analytical data successful")
            except Exception as e:
                logger.error("there is a failed cause : rabbitmq parameter not correct %s"%(e) )
                logger.error("failed info -- properties : %s  body : %s"%(properties,body))
                return
            try:
                logger.info("start logical")
                finlall_docx_name = modify_annual.main(body)
                logger.info("logical successful")
            except Exception as e:
                logger.error(str(body))
                logger.info("send a reject to rabbitmq queue : %s" % (queue_name))
                # 接收到消息后会给rabbitmq发送一个拒绝
                ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)  
                logger.info("reject process is end")
            else:
                logger.info("send a ack to rabbitmq queue : %s" % (queue_name))
                # 接收到消息后会给rabbitmq发送一个确认
                ch.basic_ack(delivery_tag=method.delivery_tag)  
        def term(sig_num, addtion):
            logger.info("stop the process current pid is %s ,group id is %s" % (os.getpid(), os.getpgrp()))
            os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
        def func(args):
            arguments = {
                'x-dead-letter-exchange': "exchange.e50.oneditor",  # 延迟结束后指向交换机(死信收容交换机)
                'x-dead-letter-routing-key': "rkey.oneditor5.dlx",  # 延迟结束后指向队列(死信收容队列),可直接设置queue name也可以设置routing-key
            logger.info("into %s sub-process pid is %s ,group id is %s"%(args,os.getpid(), os.getpgrp()))
            global queue_name
            queue_name = args
            logger.info("start to connect the RabbitmqClient")
            # 连接mq,建立connention和channel
            RabbitmqClient.connent()
            # 声明队列
            RabbitmqClient.channel.queue_declare(queue=args, durable=True,arguments=arguments)
            logger.info("connect the RabbitmqClient successful")
            # 调用回调函数并祖泽监听队列
            RabbitmqClient.expense(args, parse_result_func)
        logger.info("into listening logical --queue_listener--")
        # 接受kill -15 消息后,调用term函数
        signal.signal(signal.SIGTERM, term)
        logger.info("current pid is %s ,group id is %s" % (os.getpid(), os.getpgrp()))
		# 开启多进程
        processes_list = []
        listenque_list = ["queue.p"+str(i)+".oneditor.docx" for i in range(10)]
        # listenque_list = ["queue.p0.oneditor.docx"]
        for listenque in listenque_list:
            t = multiprocessing.Process(target=func, args=(listenque,))
            t.daemon = True
            t.start()
            processes_list.append(t)
        for p in processes_list:
            p.join()
###########################################################
        # print("current pid is % s" % os.getpid())
        # processes_list = []
        # listenque_list = ["queue.p1.oneditor.docx","queue.p2.oneditor.docx"]
        # # listenque_list = ["queue.p"+str(i)+".oneditor.docx" for i in range(2)]
        # print(listenque_list)
        # for listenque in listenque_list:
        #     t = multiprocessing.Process(target=func, args=(listenque,))
        #     t.daemon = True
        #     t.start()
        #     processes_list.append(t)
        # time.sleep(2)
        # try:
        #     for p in processes_list:
        #         p.join()
        # except Exception as e:
        #     print(str(e))
##########################################################
        # t1 = threading.Thread(target=func, args=("queue.p1.oneditor.docx",))
        # t2 = threading.Thread(target=func, args=("queue.p2.oneditor.docx",))
        # t1.start()
        # t2.start()
#########################################################
        # class Mythreadsend1(threading.Thread):
        #     def run(self):
        #         logger.info("start to listening the info...")
        #         RabbitmqClient.expense("queue.p1.oneditor.docx", parse_result_func)
        # class Mythreadsend2(threading.Thread):
        #     def run(self):
        #         logger.info("start to listening the info...")
        #         RabbitmqClient.expense("queue.p2.oneditor.docx", parse_result_func)
        # t1 = Mythreadsend1()
        # t2 = Mythreadsend2()
        # t1.start()
        # t2.start()

三,django中使用manage.py开启独立进程

参考如下:https://blog.csdn.net/luslin1711/article/details/87885145

1、开发时会使用django环境进行一些初始化操作,这些程序一般只执行几次,但是需要django中的环境变量。
2、使用django运行阻塞监听的程序,比如Rabbitmq监听,放在主程序中就阻塞住了,需要另外开命令执行

在创建的app下创建文件夹management,在management文件夹下创建文件夹commands,将要执行的文件放到文件将爱下,记得把__init__.py文件一并创建了,init.py是声明这个文件夹是一个包。然后在主目录(就是manage.py文件所在目录)执行 python manage.py 文件名即可

# 终端前台开启
python manage.py queue_listener
# 终端后台开启
nohup python manage.py queue_listener&
# 终端后台开启,并将打印log放入“黑洞”中
nohup python manage.py queue_listener > /dev/null 2>&1&
# 查看进程pid
ps -aux | grep python
				
在MQ之前,我一直使用的redis作为中间人broker然后用celery执行耗时任务。从未在python项目中使用过MQ。所以今天就在django中用RabbitMQ取代redis+celery。 django中使用RabbitMQ: 本次使用RabbitMQ完成点击注册后给用户发送激活邮件的场景,如果不使用MQ,或者不使用异步的方式,后端就会一直等待smtp服务器把邮件发到用户邮箱后才往下走(...
解决问题: 1、开发时会使用django环境进行一些初始化操作,这些程序一般只执行几次,但是需要django中的环境变量。 2、使用django运行阻塞监听的程序,比如Rabbitmq监听,放在主程序中就阻塞住了,需要另外开命令执行。 │ ├── MyRab # app名称 │ │ ├── admin.py │ │ ├── apps.py │ │ ├── __init__.py │ │ ├── management # 这个就是创建的 management 文
解决问题: 1、开发时会使用django环境进行一些初始化操作,这些程序一般只执行几次,但是需要django中的环境变量。 2、使用django运行阻塞监听的程序,比如Rabbitmq监听,放在主程序中就阻塞住了,需要另外开命令执行 一、使用 django环境运行单独命令 在创建的app下创建文件夹management,在management文件夹下创建文件夹commands,将要执行的文件放到文...
from python_logging_rabbitmq import RabbitMQHandler 或(由于 ) from python_logging_rabbitmq import RabbitMQHandlerOneWay RabbitMQHandler 用于将日志发送到RabbitMQ的基本处理程序。 使用配置的交换,每条记录将直接传递到RabbitMQ。 RabbitMQHa
django-rabbitmq-celery-docker-example 使用rabbitmq和celery在django中实现dockerized示例pub / sub类型消息队列的示例。 有关中型故事的详细解释,请参见: 发布/订阅角色定义 发布者:声明了一个单独的RabbitMQ队列生成器,并将其添加到Celery的默认producer_pool中,该默认生成器被拉出并用于在Celery任务中将新消息发布到该队列。 消费者:定义了自定义消费者类别并将其附加到Celery。 该类订阅了使用上面单独的队列生成器创建/声明的自定义队列。 在自定义使用者类中定义了handle_message回调函数,以便每次将消息发布到该特定队列时,都会调用使用者的回调,该回调使用该消息并将确认发送给RabbitMQ。 需要安装docker和docker-compose才能运行此项目。 可以通过
https://github.com/gocloudcoder/gopher-road/tree/main/middlewares 参考链接: http://rabbitmq.mr-ping.com/ 在此之前我们必须理解几个概念。 什么是中间件? 什么是单体架构以及什么是分布式架构? 什么是同步调用?什么是异步调用? 需要安装setuptools 57.5.0版本 安装的erlang为23.2版本(或者自己去网上找和rabbitmq对应的版本) 安装的rabbitmq版本为3.9.7版本(或者自己去网上找和erlang对应的版本) 使用的celery包尽量使用3.1.18 django版本最好也使用2.1.8以下版本,不然可能定时任务无法使用 #在celery 3.1.18中引入定时模块,或者提前 在Django Web平台开发中,碰到一些请求执行的任务时间较长的情况,为了加快用户的响应时间,就可以采用Celery异步任务的方式来解决 好文章 记得收藏+点赞+关注额 !!! ---- Nick.Peng 二、关于Celery Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调... celery 和 django-celery,celery 是 分布式任务队列,发邮件我们还需要通信。 celery 是支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。Celery的架构,采用典型的生产者-消费者模式: 主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果)。 Celery...
Django是一个基于Python的开源Web框架,而RabbitMQ是一个开源的消息中间件。它们可以一起使用来构建可扩展的、高性能的Web应用程序。 在Django中使用RabbitMQ可以实现异步任务处理、消息队列、事件驱动等功能。通过将任务发送到RabbitMQ消息队列中,可以异步处理一些耗时的任务,避免阻塞Web请求。 要在Django中使用RabbitMQ,首先需要安装RabbitMQ,并在Django项目中安装相应的Python库,如pika或django-rabbitmq。 接下来,你可以创建一个消息队列,并在Django中定义任务任务可以是一些需要异步执行的操作,例如发送电子邮件、生成报告等。当需要执行这些任务时,可以将它们发送到RabbitMQ队列中。 你还需要一个消费者来监听RabbitMQ队列,并在有新任务进行处理。消费者可以是一个Django管理命令,或者是一个独立的Python脚本。消费者从队列中获取任务,并按照定义的逻辑进行处理。 通过使用RabbitMQ和Django结合,你可以实现更高效的任务处理和异步操作,提升Web应用程序的性能和可靠性。
(排坑)ERROR Unexpected exception, exiting abnormally (org.apache.zookeeper.server.ZooKeeperServerMain) 12533 MOOJ: 这篇文章对 GPT 系列(包括 GPT、GPT-2、GPT-3)和 InstructGPT 的发展历程进行了深入而清晰的阐述。作者对这一技术路线的进化进行了全面总结,展现了 GPT 系列从初代到最新版本的演变过程和创新点。这种逐步的技术进步确实为自然语言处理领域注入了新的活力和可能性。 文章中详细解释了每个版本的背景、模型结构和应用领域,对读者来说十分易懂,尤其对于那些对自然语言处理领域初步感兴趣的人来说,是一篇很好的入门介绍。此外,对于 GPT 系列的未来展望也提供了很有价值的信息,让读者对该技术的前景有了更清晰的认识。 总体来说,这篇文章不仅增加了我对 GPT 系列的了解,也使我对自然语言处理技术的发展方向有了更深入的认识。期待未来 GPT 系列能够在更多领域做出更加卓越的贡献,为我们的生活和工作带来更多便利和创新。 MHA、MQA、GQA区别和联系 MOOJ: 这篇文章详尽地介绍了Meta最新版本LLaMA 2中所采用的注意力机制,特别是GQA和MQA,这些新的注意力机制为自然语言处理领域带来了创新。GQA引入了分组查询注意力的概念,使得多头注意力更加灵活和高效。MQA则通过共享Keys和Values来减少参数量,提高了推理速度。通过对MHA、MQA和GQA的对比,清晰展示了各个注意力机制在推理速度和模型精度上的权衡。 特别值得赞扬的是,作者不仅介绍了这些注意力机制的原理,还提及了相应的论文以及改进方法。这样的详细解释让读者更好地理解这些创新技术,并且可以通过进一步阅读论文了解更多细节。另外,文章中的图示非常直观,有助于读者更好地理解复杂的概念。 总的来说,这篇文章对于那些对自然语言处理和注意力机制感兴趣的人来说是非常有价值的。它提供了深入了解这些新技术的框架,为读者进一步探索和研究提供了极好的指导。 NLP(四)词形还原(Lemmatization) imel03: 词形还原就是“去掉单词的词缀”? 这不是词干提取(stemming)的内容吗 studies ——>去掉词缀——> studi ,不符合你的描述 python之字符串(str)和编码 今天不要敲代码: 可算明白了,太感谢了表情包 django多任务开启rabbitmq,并进行声明队列、发送、阻塞监听消息 TFATS: 这项目后期确实也使用了supervisor开启服务,功能会更全一些。如果服务端不工作的话,看看supervisor的配置文件,以及服务是否成功开启。