Skip to content

Commit

Permalink
Feat/0.2.2.2 (dataelement#286)
Browse files Browse the repository at this point in the history
patch, fix chatmessage bug
  • Loading branch information
yaojin3616 authored Jan 22, 2024
2 parents fe3bb21 + 6e42129 commit d6e40a5
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 116 deletions.
31 changes: 28 additions & 3 deletions docker/mysql/conf/my.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,36 @@
default-character-set=utf8mb4

[mysql]
default-character-set=utf8mb4
default_character_set = utf8mb4

[mysqld]
default_storage_engine = InnoDB
character_set_server = utf8mb4
collation-server = utf8mb4_unicode_ci

datadir = /var/lib/mysql/var
lower_case_table_names = 1
wait_timeout = 3600
interactive_timeout = 3600
performance_schema = ON

log_bin = /var/lib/mysql/mysql-bin
log_bin_index = /var/lib/mysql/mysql-bin.index
expire_logs_days = 14
max_binlog_size = 1G
binlog_format = ROW
binlog_cache_size = 16M
sync_binlog = 0

log_error = /var/log/mysql.err
general_log = 0
general_log_file = /var/log/mysql.log

slow_query_log = 1
long_query_time = 0.1
log_queries_not_using_indexes = 1
slow_query_log_file = /var/log/mysql-slow.log

init_connect='SET collation_connection = utf8mb4_unicode_ci, NAMES utf8mb4'
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci
# skip-character-set-client-handshake
sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
3 changes: 2 additions & 1 deletion src/backend/bisheng/api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def process_flow(
source_documents = task_result.pop('source_documents', '')
answer = list(task_result.values())[0]
extra = {}
source = await judge_source(answer, source_documents, session_id, extra)
source, result = await judge_source(answer, source_documents, session_id, extra)

try:
question = ChatMessage(user_id=0,
Expand All @@ -202,6 +202,7 @@ async def process_flow(
session.refresh(message)
extra.update({'source': source, 'message_id': message.id})
task_result.update(extra)
task_result.update({'result': result})
if source != 0:
await process_source_document(source_documents, session_id, message.id, answer)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/backend/bisheng/database/models/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class MessageBase(SQLModelSerializable):
receiver: Optional[Dict] = Field(index=False, default=None, description='autogen 的发送方')
intermediate_steps: Optional[str] = Field(sa_column=Column(Text), description='过程日志')
files: Optional[str] = Field(sa_column=Column(String(length=4096)), description='上传的文件等')
remark: Optional[str] = Field(sa_column=Column(String(length=10000)), description='备注')
remark: Optional[str] = Field(sa_column=Column(String(length=4096)), description='备注')
create_time: Optional[datetime] = Field(
sa_column=Column(DateTime, nullable=False, server_default=text('CURRENT_TIMESTAMP')))
update_time: Optional[datetime] = Field(
Expand Down
2 changes: 1 addition & 1 deletion src/backend/bisheng/interface/initialize/vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def initial_milvus(class_object: Type[Milvus], params: dict, search_kwargs: dict
select(Knowledge).where(Knowledge.collection_name == col)).first()

if not knowledge:
raise Exception(f'不能找到知识库collection={col}')
raise ValueError(f'不能找到知识库collection={col} knowledge_id={collection_id}')
model_param = settings.get_knowledge().get('embeddings').get(knowledge.model)
if knowledge.model == 'text-embedding-ada-002':
embedding = OpenAIEmbeddings(**model_param)
Expand Down
4 changes: 3 additions & 1 deletion src/backend/bisheng/template/frontend_node/chains.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ def format_field(field: TemplateField, name: Optional[str] = None) -> None:
if name == 'APIChain' and field.name == 'limit_to_domains':
field.show = True
field.required = True
field.field_type = 'list'
field.field_type = 'str'
field.is_list = True
field.value = ['']

field.advanced = False
if 'key' in field.name:
Expand Down
239 changes: 131 additions & 108 deletions src/backend/test/milvus_trans.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json

import requests
from bisheng_langchain.embeddings import HostEmbeddings
from bisheng_langchain.vectorstores import Milvus
from pymilvus import Collection, MilvusException
Expand All @@ -20,121 +21,143 @@
database_url = 'mysql+pymysql://root:[email protected]:3306/bisheng?charset=utf8mb4'
engine = create_engine(database_url, connect_args={}, pool_pre_ping=True)

params['collection_name'] = 'partition_textembeddingada002_knowledge_1'
openai_target = Milvus.from_documents(embedding=embedding, **params)
params['collection_name'] = 'partition_multilinguale5large_knowledge_1'
host_targe = Milvus.from_documents(embedding=embedding, **params)
with Session(engine) as session:
db_knowledge = session.exec(
'select id, collection_name, model, index_name from knowledge').all()
for knowledge in db_knowledge:
# if not knowledge[1].startswith('col'):
# if knowledge[3].startswith('col'):
# # 迁移完
# print(f"drop id={knowledge}")
# params['collection_name'] = knowledge[3]
# cli = Milvus.from_documents(embedding=embedding, **params)
# if cli.col:
# cli.col.drop()
# time.sleep(1)
# continue
if knowledge[1].startswith('col'):
print(f'deal id={knowledge[0]} model={knowledge[2]} col={knowledge[1]}')
params['collection_name'] = knowledge[1]
cli = Milvus.from_documents(embedding=embedding, **params)
if not cli.col:
print(f'escape id={knowledge[0]} col={knowledge[1]}')

def milvus_clean():
params['collection_name'] = 'partition_textembeddingada002_knowledge_1'
openai_target = Milvus.from_documents(embedding=embedding, **params)
params['collection_name'] = 'partition_multilinguale5large_knowledge_1'
host_targe = Milvus.from_documents(embedding=embedding, **params)
with Session(engine) as session:
db_knowledge = session.exec(
'select id, collection_name, model, index_name from knowledge').all()
for knowledge in db_knowledge:
# if not knowledge[1].startswith('col'):
# if knowledge[3].startswith('col'):
# # 迁移完
# print(f"drop id={knowledge}")
# params['collection_name'] = knowledge[3]
# cli = Milvus.from_documents(embedding=embedding, **params)
# if cli.col:
# cli.col.drop()
# time.sleep(1)
# continue
if knowledge[1].startswith('col'):
print(f'deal id={knowledge[0]} model={knowledge[2]} col={knowledge[1]}')
params['collection_name'] = knowledge[1]
cli = Milvus.from_documents(embedding=embedding, **params)
if not cli.col:
print(f'escape id={knowledge[0]} col={knowledge[1]}')
index_name = knowledge[1]
col_name = f'partition_{knowledge[2]}_knowledge_1'.replace('-', '')
sql = 'update knowledge set collection_name="%s", index_name="%s" where id=%d' % (
col_name, index_name, knowledge[0])
session.exec(sql)
session.commit()
continue
fields = [s.name for s in cli.col.schema.fields if s.name != 'pk']
print(fields)
pks = cli.col.query(expr='file_id>1')
pk_len = len(pks)
if pk_len == 0:
continue
li = []
batch_size = 500
for i in range(0, pk_len, batch_size):
end = min(i + batch_size, pk_len)
pk_ids = [str(pk.get('pk')) for pk in pks[i:end]]
pk_with_fields = cli.col.query(f"pk in [{','.join(pk_ids)}]",
output_fields=fields)
li.extend(pk_with_fields)
if knowledge[2] == 'text-embedding-ada-002':
target = openai_target
elif knowledge[2] == 'multilingual-e5-large':
target = host_targe
else:
continue

insert_fields = [s.name for s in target.col.schema.fields if s.name != 'pk']
insert_dict = {
'text': [],
'vector': [],
'file_id': [],
'knowledge_id': [],
'page': [],
'source': [],
'bbox': [],
'extra': []
}
for data in li:
insert_dict.get('text').append(data.get('text'))
insert_dict.get('vector').append(data.get('vector'))
insert_dict.get('file_id').append(data.get('file_id'))
insert_dict.get('knowledge_id').append(f'{knowledge[0]}')

if 'bbox' in fields:
if data.get('bbox'):
insert_dict.get('bbox').append(
'{"chunk_bboxes":%s}' %
(json.loads(data.get('bbox')).get('chunk_bboxes')))
if json.loads(data.get('bbox')).get('source'):
insert_dict.get('source').append(
json.loads(data.get('bbox')).get('source'))
if json.loads(data.get('bbox')).get('chunk_bboxes')[0].get('page'):
insert_dict.get('page').append(
json.loads(
data.get('bbox')).get('chunk_bboxes')[0].get('page'))
else:
insert_dict.get('bbox').append('')
else:
insert_dict.get('bbox').append('')
if 'source' in fields:
insert_dict.get('source').append(data.get('source'))
if len(insert_dict.get('source')) != len(insert_dict.get('bbox')):
insert_dict.get('source').append('')
if 'page' in fields:
insert_dict.get('page').append(data.get('page') if data.get('page') else 1)

insert_dict.get('extra').append('')

total_count = len(li)
batch_size = 1000
for i in range(0, total_count, batch_size):
# Grab end index
end = min(i + batch_size, total_count)
# Convert dict to list of lists batch for insertion
insert_list = [insert_dict[x][i:end] for x in insert_fields]
# Insert into the collection.
try:
res: Collection
res = target.col.insert(insert_list, timeout=100)
print(res)
except MilvusException as e:
print('Failed to insert batch starting at entity: %s/%s', i, total_count)
raise e

index_name = knowledge[1]
col_name = f'partition_{knowledge[2]}_knowledge_1'.replace('-', '')
sql = 'update knowledge set collection_name="%s", index_name="%s" where id=%d' % (
col_name, index_name, knowledge[0])
session.exec(sql)
session.commit()
continue
fields = [s.name for s in cli.col.schema.fields if s.name != 'pk']
print(fields)
pks = cli.col.query(expr='file_id>1')
pk_len = len(pks)
if pk_len == 0:
continue
li = []
batch_size = 500
for i in range(0, pk_len, batch_size):
end = min(i + batch_size, pk_len)
pk_ids = [str(pk.get('pk')) for pk in pks[i:end]]
pk_with_fields = cli.col.query(f"pk in [{','.join(pk_ids)}]", output_fields=fields)
li.extend(pk_with_fields)
if knowledge[2] == 'text-embedding-ada-002':
target = openai_target
elif knowledge[2] == 'multilingual-e5-large':
target = host_targe
else:
continue

insert_fields = [s.name for s in target.col.schema.fields if s.name != 'pk']
insert_dict = {
'text': [],
'vector': [],
'file_id': [],
'knowledge_id': [],
'page': [],
'source': [],
'bbox': [],
'extra': []
}
for data in li:
insert_dict.get('text').append(data.get('text'))
insert_dict.get('vector').append(data.get('vector'))
insert_dict.get('file_id').append(data.get('file_id'))
insert_dict.get('knowledge_id').append(f'{knowledge[0]}')

if 'bbox' in fields:
if data.get('bbox'):
insert_dict.get('bbox').append(
'{"chunk_bboxes":%s}' %
(json.loads(data.get('bbox')).get('chunk_bboxes')))
if json.loads(data.get('bbox')).get('source'):
insert_dict.get('source').append(
json.loads(data.get('bbox')).get('source'))
if json.loads(data.get('bbox')).get('chunk_bboxes')[0].get('page'):
insert_dict.get('page').append(
json.loads(data.get('bbox')).get('chunk_bboxes')[0].get('page'))
else:
insert_dict.get('bbox').append('')
else:
insert_dict.get('bbox').append('')
if 'source' in fields:
insert_dict.get('source').append(data.get('source'))
if len(insert_dict.get('source')) != len(insert_dict.get('bbox')):
insert_dict.get('source').append('')
if 'page' in fields:
insert_dict.get('page').append(data.get('page') if data.get('page') else 1)
print(f'deal_done id={knowledge[0]} index={index_name}')
cli.col.drop()
pass

insert_dict.get('extra').append('')

insert = []
total_count = len(li)
batch_size = 1000
for i in range(0, total_count, batch_size):
# Grab end index
end = min(i + batch_size, total_count)
# Convert dict to list of lists batch for insertion
insert_list = [insert_dict[x][i:end] for x in insert_fields]
# Insert into the collection.
try:
res: Collection
res = target.col.insert(insert_list, timeout=100)
def elastic_clean():
url = 'http://192.168.106.116:9200/_stats'
headers = {'Authorization': 'Basic ZWxhc3RpYzpvU0dMLXpWdlo1UDNUbTdxa0RMQw=='}
del_url = 'http://192.168.106.116:9200/%s'
col = requests.get(url, headers=headers).json()
for c in col.get('indices').keys():
if c.startswith('tmp'):
print(c)
x = requests.delete(del_url % c, headers=headers)
# url = f'http://
elif col.get('indices').get(c).get('primaries').get('docs').get('count') == 0:
print(c)
x = requests.delete(del_url % c, headers=headers)
print(x)

except MilvusException as e:
print('Failed to insert batch starting at entity: %s/%s', i, total_count)
raise e

index_name = knowledge[1]
col_name = f'partition_{knowledge[2]}_knowledge_1'.replace('-', '')
sql = 'update knowledge set collection_name="%s", index_name="%s" where id=%d' % (
col_name, index_name, knowledge[0])
session.exec(sql)
session.commit()
print(f'deal_done id={knowledge[0]} index={index_name}')
cli.col.drop()
pass
elastic_clean()
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ async def _agenerate(
"""Generate chat completion with retry."""
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs}
if self.streaming:
if self.streaming and 'infer' not in self.host_base_url:
inner_completion = ''
role = 'assistant'
params['stream'] = True
Expand Down

0 comments on commit d6e40a5

Please sign in to comment.