查看原文
其他

AIGC 利器 Ray 云原生探索之路--分布式构建本地知识库

熊中祥 道客船长 2023-07-04



在 ChatGPT 火爆全网之后,各行各业都在围绕它来进行构建自己场景的应用。这里以本地知识库为场景来聊聊怎么结合 Ray Core、Ray Serve、KubeRay、LangChain、Embeddings、向量数据库、LLM、Kubernetes 和 GPU 方案来构建属于自己的本地的知识库。对整体方案和 Ray 相关感兴趣的可以阅读前面发布的一些文章,AIGC 利器-Ray 云原生探索之路--总览篇》,《AIGC 利器 Ray 云原生探索之路--Ray Core 篇 (上)》,《AIGC 利器 Ray 云原生探索之路--Ray Core 篇 (下)》。

01

基本概念

Embeddings:在自然语言处理中,Embeddings 通常指的是将单词或短语映射到连续向量空间中的向量表示。

向量索引:向量索引(Vector Indexing)是一种用于快速检索相似向量的技术。它是基于向量空间模型(Vector Space Model)的思想,通过构建索引结构来加速相似向量的查询。在向量索引中,每个向量都被映射到一个高效的数据结构中,使得可以根据向量之间的相似度进行快速检索。如 FAISS,LlamaIndex 等。

向量数据库:支持向量数据的存储和查询的数据库就可以理解成是向量数据库,目前也有很多基于原有的数据库实现了向量插件的数据库,比如 Postgresql,ElasticSearch 等等。

LLM:Large Language Model 大语言模型。LLM 是一种基于深度学习的人工智能模型,具备强大的自然语言处理能力。它通过大规模的预训练过程,学习了广泛的语言知识和语言模式,并可以根据输入的文本生成连贯、合理的回复。

Prompt:在自然语言处理中,Prompt 是指用户给出的一段文字或指令,用于引导或提示语言模型生成特定的回复或完成特定的任务。Prompt 可以是一个问题、一个描述性的语句、一句指令,甚至是一个完整的上下文对话。它作为输入提供给语言模型,以指导模型生成相关的回答或执行相应的任务。Prompt 的设计因为直接影响语言模型的输出,所以非常重要。一个设计合理的 Prompt 可以引导模型生成更准确、更有针对性的回答。通过在 Prompt 中提供所需的背景信息、明确的要求或限制条件,可以控制模型的回答风格、内容的方向性,或者针对特定任务的执行。

LangChain:提高了一个基于 LLM 构建应用的框架。支持 6 个主要的场景支持,包括 LLM/Prompt(和大语言模型对接,提高大语言模型生成内容的质量),Chains(提供链式的方式运行各种任务),Data Augmented Generation(数据增强的生成能力),Agents(通过大语言模型结合外部工具,完成定制化能力的任务),Memory(提供大语言模型聊天的历史上下文能力),Evaluation(生成式模型很难用传统指标来评估。评估它们的一种新方法是使用语言模型本身。LangChain 提供了一些 prompts/chains 来帮助实现这一点)。

Ray Core:是 Ray 框架的底层框架,提供了一整套的分布式计算的框架,可以将普通的应用转化成分布式的系统。这里主要以 Python 语言的程序为主,具体可以参看前面的文章。

Ray Serve:是一个可扩展的模型服务库,用于构建在线推理 API。Serve 是框架无关的,所以可以使用一个工具包来服务所有的东西,从使用 PyTorch、Tensorflow 和 Keras 等框架构建的深度学习模型,到 Scikit-Learn 模型,再到任意的 Python 业务逻辑。Serve 特别适合于模型组合,使您能够构建由多个 ML 模型和业务逻辑组成的复杂推理服务(全部使用 Python 代码)。Ray Serve 是建立在 Ray Core 之上的,所以它很容易扩展到许多机器上,并提供灵活的调度支持。

Fine-Tune:基于数据对模型进行微调。这个过程也是涉及到对 LLM 的训练。目前,社区也有不少开放出来的模型都是基于这种方式的。至于是不要对模型进行调整,可以根据你的需求,如果你的任务和 LLM 本身是有不同的,是想基于 LLM 构建自己任务的大模型,那就需要微调模型。如果 LLM 本身和你的任务是一致的,比如只是为了让 LLM 知道更多知识,那也不一定要调整模型本身,用原来的基础模型就可以。

Search-Ask 方法:是一种用于问答系统的策略,结合了基于检索和基于生成的方法。它旨在充分利用检索和生成两种不同的技术,以提供更全面和准确的答案。在 Search-Ask 方法中,首先进行搜索(Search)阶段。用户的查询被发送到一个检索系统,该系统在预先建立的知识库或文档集合中执行搜索,并返回与查询相关的文档或候选答案。然后是询问(Ask)阶段。在此阶段,通过使用基于生成的方法,从检索到的文档或候选答案中提取或生成最终的答案。这可能涉及文本摘要、实体识别、关系抽取等技术来提取关键信息,或者使用生成模型(如语言模型)来生成具有上下文和语义连贯性的答案。通过结合搜索和生成两个阶段,Search-Ask 方法旨在通过检索提供初始的答案候选集,并通过生成进行下一步的筛选和精炼。这种方法的优势在于,检索阶段可以快速找到相关的文档或信息,而生成阶段可以通过理解上下文和语义生成更准确和丰富的答案。需要注意的是,具体实施 Search-Ask 方法的细节可以因应用场景和具体需求而有所不同。这种方法可以根据实际情况进行调整和定制,以达到最佳的问答效果。

02

本地向量处理

在 OpenAI 的 API 服务中提供了 embeddings 的 API,就是用于根据提供的文本输入,生成向量。但是这种是需要 OpenAI 的账号以及可以连接到 OpenAI 的 API 访问地址,而且还是收费的 API。所以,在内网或者国内的环境,如果有离线的方式也可以完成类似工作,那就是更好的选择。答案是肯定的。

在线的 API:

   curl https://api.openai.com/v1/embeddings \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $OPENAI_API_KEY" \
  -d '{
    "input": "Your text string goes here",
    "model": "text-embedding-ada-002"
  }'

离线的方式:

以下 code sample 中使用了 HuggingFace 的 Embeddings 的模型 “text2vec-large-chinese” 来完成这个能力。其中 lHuggingFaceEmbeddings 有两个方法。一个是 embed_documents 方法,用于在构建 embeddings 的时候被使用。还有一个是 embed_query 方法,是在提问的整个环境中,以及在和 LLM 通信前,需要被使用到的方法,来完成对提问的文本的向量化处理。

from langchain.embeddings import HuggingFaceEmbeddings
 
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device'"cuda"})

使用 embeddings 对文档进行向量化处理:

  • 基于 pgvector 完成向量处理和向量数据的保存:

PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver=os.environ.get("PGVECTOR_DRIVER""psycopg2"),
    host=os.environ.get("PGVECTOR_HOST""xxx.xxx.xxx.xxx"),
    port=int(os.environ.get("PGVECTOR_PORT""5432")),
    database=os.environ.get("PGVECTOR_DATABASE""testpg"),
    user=os.environ.get("PGVECTOR_USER""test"),
    password=os.environ.get("PGVECTOR_PASSWORD""xxx"),
)  
 
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device'"cuda"})
 
PGVector.from_documents(
        embedding=embeddings,
        documents=shard,
        collection_name="langchain_pg_collection01",
        connection_string=PGVECTOR_CONNECTION_STRING,
        distance_strategy=DistanceStrategy.COSINE,
        pre_delete_collection=False
)
  • 基于 elasticsearch 完成向量处理和向量数据的保存:

elastic_host = "xxx.xxx.xxx.xxx"
elasticsearch_url = f"http://elastic:McGX42vk5y705TQoA8c126bh@{elastic_host}:30003"
index_name="test_index"
 
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device'"cuda"})
 
ElasticVectorSearch.from_documents(shard, embeddings, index_name=index_name, elasticsearch_url=elasticsearch_url)

03

串行向量化

串行指的是在处理的过程中没有并发多任务处理能力,有一个 worker 顺序执行的方式去处理整个过程,包括数据文件的读取、文本的拆分以及文本的向量处理,到写入向量数据库。如果在数据量很大的情况下,整个效果会很慢,不能充分地利用好整个集群的可用资源去处理相关的任务。

import os
import time
from typing import List
import numpy as np
from langchain.embeddings.base import Embeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.pgvector import PGVector
from langchain.vectorstores.pgvector import DistanceStrategy
from tqdm import tqdm
from langchain.document_loaders import PyPDFLoader
from langchain.embeddings import HuggingFaceEmbeddings
 
 
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver=os.environ.get("PGVECTOR_DRIVER""psycopg2"),
    host=os.environ.get("PGVECTOR_HOST""xxx.xxx.xxx.xxx"),
    port=int(os.environ.get("PGVECTOR_PORT""5432")),
    database=os.environ.get("PGVECTOR_DATABASE""xxx"),
    user=os.environ.get("PGVECTOR_USER""xxx"),
    password=os.environ.get("PGVECTOR_PASSWORD""xxx"),
)
 
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=100,
    length_function=len,
)
 
# Put your directory containing PDFs here
directory = '/root/ray/docker/know'
pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]
print(f"pdf_documents is {pdf_documents}")
 
langchain_documents = []
for document in tqdm(pdf_documents):
    try:
        loader = PyPDFLoader(document)
        data = loader.load()
        langchain_documents.extend(data)
    except Exception:
        continue
 
print("Num pages: ", len(langchain_documents))
print("Splitting all documents")
chunks = text_splitter.split_documents(langchain_documents)
 
# Stage two: embed the docs.
print(f"Loading chunks into vector store ... using {db_shards} shards")
st = time.time()
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device'"cuda"})
PGVector.from_documents(
        embedding=embeddings,
        documents=chunks,
        collection_name="langchain_pg_collection01",
        connection_string=PGVECTOR_CONNECTION_STRING,
        distance_strategy=DistanceStrategy.COSINE,
        pre_delete_collection=False
)
et = time.time() - st
print(f"Shard processing complete. Time taken: {et} seconds.")
以上的 sample code 主要分为两个阶段:
  • 第一个阶段:主要是读取数据文件,并进行文本的拆分。

  • 第二个阶段:使用拆分出来的文本,使用 Embeddings 模型进行向量化处理,这里使用的是 “text2vec-large-chinese” 模型,可以根据需要替换不同的 Embeddings 模型, 然后持久化到 pg 这个类型的向量数据库中。

04

并行向量化

并行指的是在处理的过程中有并发多任务处理能力,有 n 个 worker 并行的方式去运行各种任务。如果在数据量很大的情况下,整个数据的向量化处理能力,会随着可用资源的增多,有很明显的提升。能充分的利用好整个集群的可用资源去处理相关的任务。

import os
import time
from typing import List
import numpy as np
import ray
from langchain.embeddings.base import Embeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.pgvector import PGVector
from langchain.vectorstores.pgvector import DistanceStrategy
from tqdm import tqdm
from langchain.document_loaders import PyPDFLoader
from langchain.embeddings import HuggingFaceEmbeddings
 
 
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver=os.environ.get("PGVECTOR_DRIVER""psycopg2"),
    host=os.environ.get("PGVECTOR_HOST""10.29.26.43"),
    port=int(os.environ.get("PGVECTOR_PORT""5432")),
    database=os.environ.get("PGVECTOR_DATABASE""ats_dev"),
    user=os.environ.get("PGVECTOR_USER""ats"),
    password=os.environ.get("PGVECTOR_PASSWORD""ats.123456"),
)
 
 
db_shards = 8
ray.init()
 
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=100,
    length_function=len,
)
 
# Put your directory containing PDFs here
directory = '/root/ray/docker/know'
pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]
print(f"pdf_documents is {pdf_documents}")
 
langchain_documents = []
for document in tqdm(pdf_documents):
    try:
        loader = PyPDFLoader(document)
        data = loader.load()
        langchain_documents.extend(data)
    except Exception:
        continue
 
print("Num pages: ", len(langchain_documents))
print("Splitting all documents")
chunks = text_splitter.split_documents(langchain_documents)
 
@ray.remote(num_gpus=1)
def process_shard(shard):
    print(f"Starting process_shard of {len(shard)} chunks.")
    st = time.time()
    embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device'"cuda"})
    et = time.time() - st
    print(f"Loading embeddings took {et} seconds.")
    st = time.time()
    db = PGVector.from_documents(
        embedding=embeddings,
        documents=shard,
        collection_name="langchain_pg_collection01",
        connection_string=PGVECTOR_CONNECTION_STRING,
        distance_strategy=DistanceStrategy.COSINE,
        pre_delete_collection=False
    )
    et = time.time() - st
    print(f"Shard completed in {et} seconds.")
    return 1
 
# Stage two: embed the docs.
print(f"Loading chunks into vector store ... using {db_shards} shards")
st = time.time()
shards = np.array_split(chunks, db_shards)
futures = [process_shard.remote(shards[i]) for i in range(db_shards)]
results = ray.get(futures)
et = time.time() - st
print(f"Shard processing complete. Time taken: {et} seconds.")

以上的 sample code 主要分为三个阶段:

  • 第一个阶段:主要是读取数据文件,并进行文本的拆分,为后续的并行处理提供分组数据。这里将数据分成了 8 个分组,这里是根据 db_shards 设置的数值来决定的。

  • 第二个阶段:使用拆分出来的文本,将拆分出来的数据进行分组,每组数据进行并行的向量化的处理。这里关键方法是@ray.remote(num_gpus=1) def process_shard(shard),这里使用到了 Ray Core 的 Task 的能力,Task 是可以充分利用好集群的资源,分布式的调度和运行。由于每一个负责分组处理的 Task 需要完成 Embeddings,而 Embeddings 是需要模型参与的,这里使用的是“text2vec-large-chinese”模型,可以根据需要替换不同的 Embeddings 模型, 这里是给每一个 Task 都分配一个 GPU 去独立的处理。这样的并行效率是很高的。虽然有一些模型是可以用 CPU 去运行的,但是效率还是非常低的。

  • 第三个阶段:最后,将每组向量化后的数据进行合并后,再写入向量数据库,这样完成了整个并行化向量处理的能力。

05

串行知识问答

在问答的过程中,首先需要根据问题进行向量化的处理,然后根据处理后的向量数据去向量数据库中去检索,来找到和问题相似度很高的一些上下文信息,再结合 Prompt 来得到最终需要输入给 LLM 的内容。

06

并行知识问答

在基于基本的流程之外。这里使用了 Ray Serve 的在线并发推理能力,具体的并发能力还是和 GPU 的设备数量相关,可以根据实际情况设置对应的 Ray Serve 的副本数来提升并发推理的能力,这里就是 Ray Serve Deployment 的 num_replicas 配置,这里设置了 1,会使用一张 GPU 设备提供 Serve,可根据实际情况去调整副本的数量。这里使用的是“text2vec-large-chinese”模型作为 Embeddings;使用 “chatglm2-6b-int4” 模型作为 LLM 模型。

import os
import time
from typing import List
import requests
from ray import serve
from starlette.requests import Request
import numpy as np
import ray
from langchain.document_loaders import ReadTheDocsLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.base import Embeddings
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
from langchain.prompts import PromptTemplate
from langchain.vectorstores.pgvector import PGVector
from langchain.vectorstores.pgvector import DistanceStrategy
from langchain.embeddings import HuggingFaceEmbeddings
 
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver=os.environ.get("PGVECTOR_DRIVER""psycopg2"),
    host=os.environ.get("PGVECTOR_HOST""xxx.xxx.xxx.xxx"),
    port=int(os.environ.get("PGVECTOR_PORT""5432")),
    database=os.environ.get("PGVECTOR_DATABASE""xxx"),
    user=os.environ.get("PGVECTOR_USER""xxx"),
    password=os.environ.get("PGVECTOR_PASSWORD""xxx"),
)
 
template = """
<|SYSTEM|># chatglm2-6b
- You are a helpful, polite, fact-based agent for answering questions. 
- Your answers include enough detail for someone to follow through on your suggestions. 
<|USER|>
If you don't know the answer, just say that you don't know. Don't try to make up an answer.
Please answer the following query using the context provided. 
 
CONTEXT: 
{context}
=========
QUESTION: {query} 
ANSWER: <|ASSISTANT|>"
""
PROMPT = PromptTemplate(template=template, input_variables=["context""query"])
 
 
class chatGLM():
    def __init__(self, model_name) -> None:
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
        self.model = AutoModel.from_pretrained(model_name, trust_remote_code=True).half().cuda().eval()
 
    def __call__(self, prompt) -> any:
        max_length = 10000
        temperature = 0.01
        response, _ = self.model.chat(self.tokenizer , prompt, max_length=max_length, temperature=temperature)
        return response
 
 
class ChatglmChain():
    def __init__(self, llm, prompt) -> None:
        self.llm = llm
        self.prompt = prompt
 
    def run(self, query, context=None) -> any:
        if context is not None:
            prompt = self.prompt.format(query=query, context=context)
        else:
            prompt = self.prompt.format(query=query)
        print("query=%s  -> prompt=%s"%(query, prompt))
        print("*"*60)
        response = self.llm(prompt) 
        return response
 
 
@serve.deployment(num_replicas=1, ray_actor_options={"num_gpus": 1})
class VectorSearchDeployment:
    def __init__(self):
        st = time.time()
        self.embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device'"cuda"})
        self.db = PGVector(
            connection_string=PGVECTOR_CONNECTION_STRING, 
            embedding_function=self.embeddings, 
            collection_name="langchain_pg_collection01",
            distance_strategy=DistanceStrategy.COSINE
        )
 
        et = time.time() - st
        print(f"Loading database took {et} seconds.")
 
        self.llm =  chatGLM(model_name="/root/ray/docker/chatglm2-6b-int4")
 
        st = time.time()
        self.chain = ChatglmChain(llm=self.llm, prompt=PROMPT)
        et = time.time() - st
        print(f"Loading HF model took {et} seconds.")
        print("===== end =====")
 
    def search(self, query):
        print(f"query is: {query}")
        search_results = self.db.similarity_search_with_score(query, k=4)
        context = ""
        for pack in search_results:
            doc, socre = pack
            content = doc.page_content
            print("检索到的知识=%s, from=%s, socre=%.3f"%(content, doc.metadata.get("from"), socre))
            context += content
 
        response = self.chain.run(query=query, context=context)
        print(f"Result is: {response}")
        return response
 
    async def __call__(self, request: Request) -> List[str]:
            return self.search(request.query_params["query"])
 
 
deployment = VectorSearchDeployment.bind()

以上的 sample code 主要分为五个阶段:

  • 第一个阶段:首先定义了 QADeployment 类,这个类中完成整体的代码的封装和逻辑。这里将这个类标记成 Ray 的 Serve,只要将类使用相关的标注 “@serve.deployment(ray_actor_options={"num_gpus": 1})”。这里使用到了 Ray Core 的 Actor,以及使用 GPU 的资源来提供推理服务。同时在处理请求的时候,使用了 Ray Core 的 async 能力,让其可以支持异步方式处理请求。

  • 第二个阶段:在__init__过程中,首先会从向量数据库中加载向量数据,这里使用了支持 pgvector 的 postgresql 数据库作为向量数据库;其次会设置用于对问题进行向量化处理的 embeddings 的模型,这里使用的是 “text2vec-large-chinese” 模型;然后指定 LLM,这里使用的是 “chatglm2-6b-int4” 模型;最后,使用 LangChain 定义 chain 对象,将 LLM,Prompt 等关联起来。至此,Serve 的初始化环节就结束了。

  • 第三个阶段:在接收提问的过程中,首先会使用问题进行向量化处理后,再使用向量的相似度查询去获取问题相关的上下文,使用问题以及问题相关的上下文结合 PromptTemplate 完成 LLM 的输入内容的封装,最后去传递给 LLM 去做最后的问题的回答,得到 LLM 的回答。

  • 第四个阶段:最后使用 deployment = QADeployment.bind()的方法定义 Ray Serve 应用,然后通过 serve run serve:deployment 的方式启动 serve 应用,对外提供推理服务。

  • 第五个阶段:server 应用在本地启动的默认的监听的 port 是 8000。接下来就可以去 call 这个地址去完成问答了。

07

离线模型

在企业内部建议下载离线的模型,使用本地的方式加载模型,因为一般模型都很大,在运行的时候在线下载模型不是很稳定,耗时也很久。所以建议下载到本地。可以到https://huggingface.co/上去 search 到自己需要的模型,然后下载所有的相关文件到本地就可以。在构建 Dockerfile 的时候,将这些文件 COPY 到指定的容器的文件夹下,然后程序在加载 model 的时候,用包含模型文件的文件夹即可,包括 Embeddings 和 LLM 模型等。

下载 text2vec-large-chinese 模型:

下载 chatglm2-6b-int4 模型:

08

镜像准备

向量构建相关:

CPU 类型的镜像,用于启动 Ray Cluster 的 Head 节点。
from rayproject/ray:2.5.0.128cf3-py310
 
RUN sudo apt-get update
 
RUN sudo apt-get -y install libpq-dev python3-dev
 
COPY requirements.txt /tmp
 
RUN pip install -r /tmp/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
 
COPY text2vec-large-chinese /tmp/text2vec-large-chinese
 
RUN rm /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
 
COPY pgvector.py /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
 
COPY know.tar.gz /tmp
 
RUN tar -zxvf /tmp/know.tar.gz -C /tmp
GPU 类型的镜像,用于启动 Ray Cluster 的 Worker 节点。

from rayproject/ray-ml:2.5.0.87d3e6-py310-gpu
 
RUN sudo apt-get update
 
RUN sudo apt-get -y install libpq-dev python3-dev
 
COPY requirements.txt /tmp
 
RUN pip install -r /tmp/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
 
COPY text2vec-large-chinese /tmp/text2vec-large-chinese
 
RUN rm /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
 
COPY pgvector.py /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
 
COPY know.tar.gz /tmp
 
RUN tar -zxvf /tmp/know.tar.gz -C /tmp

问答系统相关:

CPU 类型的镜像,用于启动 Ray Cluster 的 Head 节点。
from rayproject/ray:2.5.0.128cf3-py310
 
RUN sudo apt-get update
 
RUN sudo apt-get -y install libpq-dev python3-dev
 
COPY requirements.txt /tmp
 
RUN pip install -r /tmp/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
 
COPY text2vec-large-chinese /tmp/text2vec-large-chinese
 
COPY chatglm2-6b-int4 /tmp/chatglm2-6b-int4
 
RUN rm /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
 
COPY pgvector.py /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
 
RUN sudo mkdir /home/ray/qa
 
COPY qa.zip /home/ray/qa/
GPU 类型的镜像,用于启动 Ray Cluster 的 Worker 节点。
from rayproject/ray-ml:2.5.0.87d3e6-py310-gpu
 
RUN sudo apt-get update
 
RUN sudo apt-get -y install libpq-dev python3-dev
 
COPY requirements.txt /tmp
 
RUN pip install -r /tmp/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
 
COPY text2vec-large-chinese /tmp/text2vec-large-chinese
 
COPY chatglm2-6b-int4 /tmp/chatglm2-6b-int4
 
RUN rm /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
 
COPY pgvector.py /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
 
RUN sudo mkdir /home/ray/qa
 
COPY qa.zip /home/ray/qa/

09

实践--向量构建

构建向量索引(1 张 GPU):这里实用 kuberay 项目的 RayJob CRD 来完成向量的构建,它会负责创建一个 Ray Cluster,同时在 Ray Cluster 启动成功之后,将 configmap 中挂载的 Job 提交给 Ray Cluster,当 Job 中的所有任务运行结束之后,向量构建就算完成了。同时支持多个 GPU 并行文件的向量化处理, 这里 db_shards=1 ,这里是为了对比单卡和多卡的区别,使用了一张 GPU 设备,所以设置成了 1,这样运行下的并行的增效并不明显。可以根据本地的可用的 GPU 设备来调整并行的 tasks 数量,这样就可以完全并行起来了。head group 使用 CPU 类型的镜像,worker group 使用 GPU 类型的镜像。
apiVersion: v1
kind: ConfigMap
metadata:
  name: buildpgvector
data:
  buildpgvector.py: |
    import os
    import time
    from typing import List
    import numpy as np
    import ray
    from langchain.embeddings.base import Embeddings
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain.vectorstores.pgvector import PGVector
    from langchain.vectorstores.pgvector import DistanceStrategy
    from tqdm import tqdm
    from langchain.document_loaders import PyPDFLoader
    from langchain.embeddings import HuggingFaceEmbeddings
 
    PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
        driver=os.environ.get("PGVECTOR_DRIVER""psycopg2"),
        host=os.environ.get("PGVECTOR_HOST""xxx.xxx.xxx.xxx"),
        port=int(os.environ.get("PGVECTOR_PORT""5432")),
        database=os.environ.get("PGVECTOR_DATABASE""xxx"),
        user=os.environ.get("PGVECTOR_USER""xxx"),
        password=os.environ.get("PGVECTOR_PASSWORD""xxx"),
    )
 
    db_shards = 1
    ray.init()
 
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=100,
        length_function=len,
    )
 
    # Put your directory containing PDFs here
    directory = '/tmp/know'
    pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]
    print(f"pdf_documents is {pdf_documents}")
    langchain_documents = []
    for document in tqdm(pdf_documents):
        try:
            loader = PyPDFLoader(document)
            data = loader.load()
            langchain_documents.extend(data)
        except Exception:
            continue
 
    print("Num pages: ", len(langchain_documents))
    print("Splitting all documents")
    chunks = text_splitter.split_documents(langchain_documents)
 
    @ray.remote(num_gpus=1)
    def process_shard(shard):
        print(f"Starting process_shard of {len(shard)} chunks.")
        st = time.time()
        embeddings = HuggingFaceEmbeddings(model_name="/tmp/text2vec-large-chinese" ,model_kwargs={'device'"cuda"})
        et = time.time() - st
        print(f"Loading embeddings took {et} seconds.")
        st = time.time()
        db = PGVector.from_documents(
            embedding=embeddings,
            documents=shard,
            collection_name="langchain_pg_collection01",
            connection_string=PGVECTOR_CONNECTION_STRING,
            distance_strategy=DistanceStrategy.COSINE,
            pre_delete_collection=False
        )
        et = time.time() - st
        print(f"Shard completed in {et} seconds.")
        return 1
 
    # Stage two: embed the docs.
    print(f"Loading chunks into vector store ... using {db_shards} shards")
    st = time.time()
    shards = np.array_split(chunks, db_shards)
    futures = [process_shard.remote(shards[i]) for i in range(db_shards)]
    results = ray.get(futures)
    et = time.time() - st
    print(f"Shard processing complete. Time taken: {et} seconds.")
 
---
 
apiVersion: ray.io/v1alpha1
kind: RayJob
metadata:
  name: buildpgvector
spec:
  entrypoint: python /home/ray/samples/buildpgvector.py
  runtimeEnv: ewogICAgInBpcCI6IFsKICAgIF0sCiAgICAiZW52X3ZhcnMiOiB7ImNvdW50ZXJfbmFtZSI6ICJ0ZXN0X2NvdW50ZXIifQp9
  rayClusterSpec:
    rayVersion: '2.5.0'
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
        num-gpus: "0"
      serviceType: NodePort
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-cpu:0.1.12
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265
                  name: dashboard
                - containerPort: 10001
                  name: client
                - containerPort: 8000
                  name: serve
              resources:
                requests:
                  cpu: "1000m"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: buildpgvector
              securityContext:
                privileged: true
          volumes:
            - name: buildpgvector
              configMap:
                name: buildpgvector
                items:
                  - key: buildpgvector.py
                    path: buildpgvector.py
    workerGroupSpecs:
      - replicas: 1
        minReplicas: 1
        maxReplicas: 5
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker
                image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-gpu:0.1.12
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    nvidia.com/gpu: "1"
                  requests:
                    cpu: "1000m"
                    nvidia.com/gpu: "1"
                securityContext:
                  privileged: true

查看 Ray Cluster 中的 Ray Job 的运行情况(使用 1 张 GPU 卡):从运行的日志看的出,在一个 GPU 的情况下使用了 43s 左右的时间处理了所有的数据。

滑动查看更多

构建向量索引(2 张 GPU):这里实用 kuberay 项目的 RayJob CRD 来完成向量的构建,它会负责创建一个 Ray Cluster,同时在 Ray Cluster 启动成功之后,将 configmap 中挂载的 Job 提交给 Ray Cluster,当 Job 中的所有任务运行结束之后,向量构建就算完成了。同时支持多个 GPU 并行进行文件的向量化处理, 这里 db_shards=2,是因为这里测试的是两张 GPU 卡的Serve 能力,所以设置成了 2,这样就可以完全并行起来了。

apiVersion: v1
kind: ConfigMap
metadata:
  name: buildpgvector
data:
  buildpgvector.py: |
    import os
    import time
    from typing import List
    import numpy as np
    import ray
    from langchain.embeddings.base import Embeddings
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain.vectorstores.pgvector import PGVector
    from langchain.vectorstores.pgvector import DistanceStrategy
    from tqdm import tqdm
    from langchain.document_loaders import PyPDFLoader
    from langchain.embeddings import HuggingFaceEmbeddings
 
    PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
        driver=os.environ.get("PGVECTOR_DRIVER""psycopg2"),
        host=os.environ.get("PGVECTOR_HOST""xxx.xxx.xxx.xxx"),
        port=int(os.environ.get("PGVECTOR_PORT""5432")),
        database=os.environ.get("PGVECTOR_DATABASE""xxx"),
        user=os.environ.get("PGVECTOR_USER""xxx"),
        password=os.environ.get("PGVECTOR_PASSWORD""xxx"),
    )
 
    db_shards = 2
    ray.init()
 
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=100,
        length_function=len,
    )
 
    # Put your directory containing PDFs here
    directory = '/tmp/know'
    pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]
    print(f"pdf_documents is {pdf_documents}")
    langchain_documents = []
    for document in tqdm(pdf_documents):
        try:
            loader = PyPDFLoader(document)
            data = loader.load()
            langchain_documents.extend(data)
        except Exception:
            continue
 
    print("Num pages: ", len(langchain_documents))
    print("Splitting all documents")
    chunks = text_splitter.split_documents(langchain_documents)
 
    @ray.remote(num_gpus=1)
    def process_shard(shard):
        print(f"Starting process_shard of {len(shard)} chunks.")
        st = time.time()
        embeddings = HuggingFaceEmbeddings(model_name="/tmp/text2vec-large-chinese" ,model_kwargs={'device'"cuda"})
        et = time.time() - st
        print(f"Loading embeddings took {et} seconds.")
        st = time.time()
        db = PGVector.from_documents(
            embedding=embeddings,
            documents=shard,
            collection_name="langchain_pg_collection01",
            connection_string=PGVECTOR_CONNECTION_STRING,
            distance_strategy=DistanceStrategy.COSINE,
            pre_delete_collection=False
        )
        et = time.time() - st
        print(f"Shard completed in {et} seconds.")
        return 1
 
    # Stage two: embed the docs.
    print(f"Loading chunks into vector store ... using {db_shards} shards")
    st = time.time()
    shards = np.array_split(chunks, db_shards)
    futures = [process_shard.remote(shards[i]) for i in range(db_shards)]
    results = ray.get(futures)
    et = time.time() - st
    print(f"Shard processing complete. Time taken: {et} seconds.")
 
---
 
apiVersion: ray.io/v1alpha1
kind: RayJob
metadata:
  name: buildpgvector
spec:
  entrypoint: python /home/ray/samples/buildpgvector.py
  runtimeEnv: ewogICAgInBpcCI6IFsKICAgIF0sCiAgICAiZW52X3ZhcnMiOiB7ImNvdW50ZXJfbmFtZSI6ICJ0ZXN0X2NvdW50ZXIifQp9
  rayClusterSpec:
    rayVersion: '2.5.0'
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
        num-gpus: "0"
      serviceType: NodePort
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-cpu:0.1.12
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265
                  name: dashboard
                - containerPort: 10001
                  name: client
                - containerPort: 8000
                  name: serve
              resources:
                requests:
                  cpu: "1000m"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: buildpgvector
              securityContext:
                privileged: true
          volumes:
            - name: buildpgvector
              configMap:
                name: buildpgvector
                items:
                  - key: buildpgvector.py
                    path: buildpgvector.py
    workerGroupSpecs:
      - replicas: 2
        minReplicas: 1
        maxReplicas: 5
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker
                image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-gpu:0.1.12
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    nvidia.com/gpu: "1"
                  requests:
                    cpu: "1000m"
                    nvidia.com/gpu: "1"
                securityContext:
                  privileged: true

查看 Ray Cluster 中的 Ray Job 的运行情况(使用 2 张 GPU 卡):分析运行时间:这里还是相同的数据,但是使用的时间为,一个 task 用了 15s,一个 task 用了 25s,因为这里的两张 GPU 卡型号不同,性能不同。

查看内容向量化处理之后,在向量数据库中的数据保存情况:

10

实践--问答服务

问答服务(一张 GPU 卡)。这里使用了 RayService CRD 来完成 serve 环节。其中 RayService 也是会启动一个 Ray Cluster 集群,同时根据设置的 runtimeEnv 的 working_dir 的 code 所在路径作为工作路径,基于 importPath 的设置启动 Ray Serve。启动成功之后,会对外提供一个 NodePort 类型的 Kubernetes 的 Service,通过这个 Service 可以访问部署出来的基于 Embeddings 和 LLM 的 serve 了。访问的方式就是基于 http 的方式请求就可以。head group 使用 CPU 类型的镜像,worker group 使用 GPU 类型的镜像。这里测试使用一张 GPU 卡的情况,所以 Serve 的副本数只有 1 个,在 cr 的 numReplicas 中设置为 1。

apiVersion: v1
kind: ConfigMap
metadata:
  name: query
data:
  query.py: |
    import sys
    import requests
 
    query = sys.argv[1]
    response = requests.post(f"http://localhost:8000/?query={query}")
    print(response.content.decode())
 
---
 
apiVersion: ray.io/v1alpha1
kind: RayService
metadata:
  name: qallmpg
spec:
  serviceUnhealthySecondThreshold: 3000
  deploymentUnhealthySecondThreshold: 3000
  serveConfig:
    importPath: qa.deployment
    runtimeEnv: |
      working_dir: "file:///home/ray/qa/qa.zip"
    deployments:
      - name: VectorSearchDeployment
        numReplicas: 1
        rayActorOptions:
          numGpus: 1   rayClusterConfig:
    rayVersion: '2.5.0'
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
        num-gpus: "0"
      serviceType: NodePort
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-cpu:0.1.12
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265
                  name: dashboard
                - containerPort: 10001
                  name: client
                - containerPort: 8000
                  name: serve
              resources:
                requests:
                  cpu: "1000m"
              securityContext:
                  privileged: true
              volumeMounts:
                - mountPath: /home/ray/query
                  name: query
          volumes:
            - name: query
              configMap:
                name: query
                items:
                  - key: query.py
                    path: query.py
    workerGroupSpecs:
      - replicas: 1
        minReplicas: 1
        maxReplicas: 5
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker
                image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-gpu:0.1.12
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    nvidia.com/gpu: "1"
                  requests:
                    cpu: "1000m"
                    nvidia.com/gpu: "1"
                securityContext:
                  privileged: true
                volumeMounts:
                  - mountPath: /home/ray/query
                    name: query
            volumes:
              - name: query
                configMap:
                  name: query
                  items:
                    - key: query.py
                      path: query.py

查看 Ray Cluster 的运行情况和资源使用情况: head 点,一 GPU worker 点。

查看 Ray Cluster 中的 Ray Serve 的运行情况:这里启动了一个副本进行提供服务。

问答服务(两张 GPU 卡)。这里使用了 RayService CRD 来完成 serve 环节。其中 RayService 也是会启动一个 Ray Cluster 集群,同时根据设置的 runtimeEnv 的 working_dir 的 code 所在路径作为工作路径,基于 importPath 的设置启动 Ray Serve。启动成功之后,会对外提供一个 NodePort 类型的 Kubernetes 的 Service,通过这个 Service 可以访问部署出来的基于 Embeddings 和 LLM 的 serve 了。访问的方式就是基于 http 的方式请求。head group 使用 CPU 类型的镜像,worker group 使用 GPU 类型的镜像。这里测试使用两张 GPU 设备,并提供 Serve 的情况,所以 Serve 的副本数有 2 个,在 cr 的 numReplicas 中设置为 2。
apiVersion: v1
kind: ConfigMap
metadata:
  name: query
data:
  query.py: |
    import sys
    import requests
 
    query = sys.argv[1]
    response = requests.post(f"http://localhost:8000/?query={query}")
    print(response.content.decode())
 
---
 
apiVersion: ray.io/v1alpha1
kind: RayService
metadata:
  name: qallmpg
spec:
  serviceUnhealthySecondThreshold: 3000
  deploymentUnhealthySecondThreshold: 3000
  serveConfig:
    importPath: qa.deployment
    runtimeEnv: |
      working_dir: "file:///home/ray/qa/qa.zip"
    deployments:
      - name: VectorSearchDeployment
        numReplicas: 2
        rayActorOptions:
          numGpus: 1
  rayClusterConfig:
    rayVersion: '2.5.0'
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
        num-gpus: "0"
      serviceType: NodePort
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-cpu:0.1.12
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265
                  name: dashboard
                - containerPort: 10001
                  name: client
                - containerPort: 8000
                  name: serve
              resources:
                requests:
                  cpu: "1000m"
              securityContext:
                  privileged: true
              volumeMounts:
                - mountPath: /home/ray/query
                  name: query
          volumes:
            - name: query
              configMap:
                name: query
                items:
                  - key: query.py
                    path: query.py
    workerGroupSpecs:
      - replicas: 2
        minReplicas: 1
        maxReplicas: 5
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker
                image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-gpu:0.1.12
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    nvidia.com/gpu: "1"
                  requests:
                    cpu: "1000m"
                    nvidia.com/gpu: "1"
                securityContext:
                  privileged: true
                volumeMounts:
                  - mountPath: /home/ray/query
                    name: query
            volumes:
              - name: query
                configMap:
                  name: query
                  items:
                    - key: query.py
                      path: query.py

查看 Ray Cluster 的运行情况和资源使用情况: head 点,两 worker 点,每 worker 点占用一 GPU

查看 Ray Cluster 中的 Ray Serve 的运行情况 : 这里启动了两个副本进行提供服务。

测试问答系统:以下脚本是打包在镜像中的,可以在容器里/home/ray/query 路径下使用 query.py 去测试,如果是外部测试的话,需要根据 RayService 派生出来的 serve 的 Kubernetes Service 对应的 NodePort 去修改 localhost:8000 这个地址就可以。

先找到 xxx-serve-svc 的 Kubernetes Service, 这里就是“qallmpg-serve-svc ” ,找到 8000 端口对应的 NodePort 端口,这里 31223 端口就是可以外部访问的模型服务的端口。因为 Ray Serve 默认启动的服务的监听的 Port 就是 8000。
kubectl get svc
NAME                                     TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                                                                       AGE
kubernetes                               ClusterIP   10.233.0.1      <none>        443/TCP                                                                                       27d
qallmpg-head-svc                         NodePort    10.233.26.237   <none>        10001:31581/TCP,8265:31199/TCP,52365:32145/TCP,6379:31982/TCP,8080:31512/TCP,8000:30937/TCP   35s
qallmpg-raycluster-rpr9z-dashboard-svc   NodePort    10.233.17.34    <none>        52365:30241/TCP                                                                               115s
qallmpg-raycluster-rpr9z-head-svc        NodePort    10.233.22.76    <none>        10001:31899/TCP,8265:32540/TCP,52365:30413/TCP,6379:31509/TCP,8080:30152/TCP,8000:30252/TCP   115s
qallmpg-serve-svc                        NodePort    10.233.11.80    <none>        8000:31223/TCP                                                                                35s

vi query-external.py

import sys
import requests
 
query = sys.argv[1]
response = requests.post(f"http://10.29.26.99:31223/?query={query}")
print(response.content.decode())

提问:

1.一张 GPU 卡(性能较差的卡)

1.1 每次只问一个问题:

问题 1: karmada 包含哪些组件?        
耗时:86.1 s。
问题 2: cilium service mesh 是什么?   
耗时:85.9 s。

1.2. 每次同时问两个问题:

问题 1: karmada 包含哪些组件?

耗时:173.1 s。
问题 2: cilium service mesh 是什么?   
耗时:147.7 s。

2. 两张 GPU 卡(一张性能较差的卡,一张性能较好的卡)

2.1. 每次只问同一个问题,但是请求被 LB 到不同的卡上的测试效果:
问题: karmada 包含哪些组件?

第一次 耗时:69.1 s。
第二次 耗时:14.5 s。

2.2. 每次同时问两个问题,每个问题被同时 LB 到不同卡上的效果:
问题 1: karmada 包含哪些组件?         

耗时:85.2 s。

问题 2: cilium service mesh 是什么?   
耗时:9.2 s。

11

总结

结合 Ray Core、Ray Serve、KubeRay、LangChain、Embeddings、向量数据库、LLM、Kubernetes 和 GPU 方案来构建知识库可以更大程度的充分利用资源,也可以更大程度的提升整体的灵活性和效率。


参考链接:

https://docs.ray.io/en/latest/

https://github.com/ray-project/kuberay

https://github.com/ray-project/langchain-ray





 本文作者 


熊中祥

现任「DaoCloud 道客」技术合伙人兼云原生技术专家





热门推荐

            

访问以下网址,或点击文末【阅读原文】直接下载

新一代云原生操作系统底座--DCE 5.0 社区版:https://docs.daocloud.io/download/dce5/
任何组织、机构和个人,都能免费体验企业级云原生性能




DaoCloud 公司简介

「DaoCloud 道客」云原生领域的创新领导者,成立于 2014 年底,拥有自主知识产权的核心技术,致力于打造开放的云操作系统为企业数字化转型赋能。产品能力覆盖云原生应用的开发、交付、运维全生命周期,并提供公有云、私有云和混合云等多种交付方式。成立迄今,公司已在金融科技、先进制造、智能汽车、零售网点、城市大脑等多个领域深耕,标杆客户包括交通银行、浦发银行、上汽集团、东风汽车、海尔集团、屈臣氏、金拱门(麦当劳)等。目前,公司已完成了 D 轮超亿元融资,被誉为科技领域准独角兽企业。公司在北京、南京、武汉、深圳、成都设立多家分公司及合资公司,总员工人数超过 350 人,是上海市高新技术企业、上海市“科技小巨人”企业和上海市“专精特新”企业,并入选了科创板培育企业名单。


网址:www.daocloud.io

邮件:info@daocloud.io

电话:400 002 6898



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存