Skip to content

Commit

Permalink
llm app alerts improvements: extract true query, improve dedupe promp…
Browse files Browse the repository at this point in the history
…t. (#4767)

* from old pr

* add intent detection

* alerts dedupe + update pathway version

(update pathway needed as the subscribe didnt have consolidates
=>
udf dedupe didnt work)

* fix isort

* confirm alert in response

* better diff prompt

* improve dedupe again. Separate input query

* revert poetry lockf

---------

Co-authored-by: mdmalhou <[email protected]>
GitOrigin-RevId: 58886f925e223eba8f06bfa847ddcb2dd385517e
  • Loading branch information
2 people authored and Manul from Pathway committed Oct 23, 2023
1 parent 5572bb3 commit 0ca8fb1
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 1 deletion.
3 changes: 3 additions & 0 deletions examples/pipelines/alert/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .app import run

__all__ = ["run"]
248 changes: 248 additions & 0 deletions examples/pipelines/alert/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
"""
Microservice for a context-aware ChatGPT assistant.
The following program reads in a collection of documents,
embeds each document using the OpenAI document embedding model,
then builds an index for fast retrieval of documents relevant to a question,
effectively replacing a vector database.
The program then starts a REST API endpoint serving queries about programming in Pathway.
Each query text is first turned into a vector using OpenAI embedding service,
then relevant documentation pages are found using a Nearest Neighbor index computed
for documents in the corpus. A prompt is build from the relevant documentations pages
and sent to the OpenAI GPT-4 chat service for processing.
Usage:
In the root of this repository run:
`poetry run ./run_examples.py alerts`
or, if all dependencies are managed manually rather than using poetry
`python examples/pipelines/alerts/app.py`
You can also run this example directly in the environment with llm_app instaslled.
To call the REST API:
curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ | jq
"""

import os

import pathway as pw
from pathway.stdlib.ml.index import KNNIndex

from llm_app import deduplicate, send_slack_alerts
from llm_app.model_wrappers import OpenAIChatGPTModel, OpenAIEmbeddingModel


class DocumentInputSchema(pw.Schema):
doc: str


class QueryInputSchema(pw.Schema):
query: str
user: str


# Helper Functions
@pw.udf
def build_prompt(documents, query):
docs_str = "\n".join(documents)
prompt = f"""Please process the documents below:
{docs_str}
Respond to query: '{query}'
"""
return prompt


@pw.udf
def build_prompt_check_for_alert_request_and_extract_query(query: str) -> str:
prompt = f"""Evaluate the user's query and identify if there is a request for notifications on answer alterations:
User Query: '{query}'
Respond with 'Yes' if there is a request for alerts, and 'No' if not,
followed by the query without the alerting request part.
Examples:
"Tell me about windows in Pathway" => "No. Tell me about windows in Pathway"
"Tell me and alert about windows in Pathway" => "Yes. Tell me about windows in Pathway"
"""
return prompt


@pw.udf
def split_answer(answer: str) -> tuple[bool, str]:
alert_enabled = "yes" in answer[:3].lower()
true_query = answer[3:].strip(' ."')
return alert_enabled, true_query


def build_prompt_compare_answers(new: str, old: str) -> str:
prompt = f"""
Are the two following responses deviating?
Answer with Yes or No.
First response: "{old}"
Second response: "{new}"
"""
return prompt


def make_query_id(user, query) -> str:
return str(hash(query + user)) # + str(time.time())


@pw.udf
def construct_notification_message(query: str, response: str) -> str:
return f'New response for question "{query}":\n{response}'


@pw.udf
def construct_message(response, alert_flag):
if alert_flag:
return response + "\n\n🔔 Activated"
return response


def decision_to_bool(decision: str) -> bool:
return "yes" in decision.lower()


def run(
*,
data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/pathway-docs/"),
api_key: str = os.environ.get("OPENAI_API_TOKEN", ""),
host: str = "0.0.0.0",
port: int = 8080,
embedder_locator: str = "text-embedding-ada-002",
embedding_dimension: int = 1536,
model_locator: str = "gpt-3.5-turbo",
max_tokens: int = 400,
temperature: float = 0.0,
slack_alert_channel_id=os.environ.get("SLACK_ALERT_CHANNEL_ID", ""),
slack_alert_token=os.environ.get("SLACK_ALERT_TOKEN", ""),
**kwargs,
):
# Part I: Build index
embedder = OpenAIEmbeddingModel(api_key=api_key)

documents = pw.io.jsonlines.read(
data_dir,
schema=DocumentInputSchema,
mode="streaming",
autocommit_duration_ms=50,
)

enriched_documents = documents + documents.select(
data=embedder.apply(text=pw.this.doc, locator=embedder_locator)
)

index = KNNIndex(
enriched_documents.data, enriched_documents, n_dimensions=embedding_dimension
)

# Part II: receive queries, detect intent and prepare cleaned query

query, response_writer = pw.io.http.rest_connector(
host=host,
port=port,
schema=QueryInputSchema,
autocommit_duration_ms=50,
keep_queries=True,
)

model = OpenAIChatGPTModel(api_key=api_key)

query += query.select(
prompt=build_prompt_check_for_alert_request_and_extract_query(query.query)
)
query += query.select(
tupled=split_answer(
model.apply(
pw.this.prompt,
locator=model_locator,
temperature=0.3,
max_tokens=100,
)
),
)
query = query.select(
pw.this.user,
alert_enabled=pw.this.tupled[0],
query=pw.this.tupled[1],
)

query += query.select(
data=embedder.apply(text=pw.this.query, locator=embedder_locator),
query_id=pw.apply(make_query_id, pw.this.user, pw.this.query),
)

# Part III: respond to queries

query_context = query + index.get_nearest_items(query.data, k=3).select(
documents_list=pw.this.doc
).with_universe_of(query)

prompt = query_context.select(
pw.this.query_id,
pw.this.query,
pw.this.alert_enabled,
prompt=build_prompt(pw.this.documents_list, pw.this.query),
)

responses = prompt.select(
pw.this.query_id,
pw.this.query,
pw.this.alert_enabled,
response=model.apply(
pw.this.prompt,
locator=model_locator,
temperature=temperature,
max_tokens=max_tokens,
),
)

output = responses.select(
result=construct_message(pw.this.response, pw.this.alert_enabled)
)

response_writer(output)

# Part IV: send alerts about responses which changed significantly.

responses = responses.filter(pw.this.alert_enabled)

def acceptor(new: str, old: str) -> bool:
if new == old:
return False

decision = model(
build_prompt_compare_answers(new, old),
locator=model_locator,
max_tokens=20,
)
return decision_to_bool(decision)

pw.io.jsonlines.write(responses, "./examples/ui/data/new_responses.jsonl")

deduplicated_responses = deduplicate(
responses,
col=responses.response,
acceptor=acceptor,
instance=responses.query_id,
)
pw.io.jsonlines.write(
deduplicated_responses, "./examples/ui/data/deduped_responses.jsonl"
)

alerts = deduplicated_responses.select(
message=construct_notification_message(pw.this.query, pw.this.response)
)
send_slack_alerts(alerts.message, slack_alert_channel_id, slack_alert_token)

pw.run()


if __name__ == "__main__":
run()
9 changes: 8 additions & 1 deletion llm_app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from llm_app import model_wrappers as model_wrappers
from llm_app.processing import chunk_texts, extract_texts
from llm_app.utils import deduplicate, send_slack_alerts

__all__ = ["model_wrappers", "extract_texts", "chunk_texts"]
__all__ = [
"model_wrappers",
"extract_texts",
"chunk_texts",
"deduplicate",
"send_slack_alerts",
]
69 changes: 69 additions & 0 deletions llm_app/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Any, Callable, TypeVar

import pathway as pw
import requests


def send_slack_alerts(
message: pw.ColumnReference, slack_alert_channel_id, slack_alert_token
):
def send_slack_alert(key, row, time, is_addition):
if not is_addition:
return
alert_message = row[message.name]
requests.post(
"https://slack.com/api/chat.postMessage",
data="text={}&channel={}".format(alert_message, slack_alert_channel_id),
headers={
"Authorization": "Bearer {}".format(slack_alert_token),
"Content-Type": "application/x-www-form-urlencoded",
},
).raise_for_status()

pw.io.subscribe(message._table, send_slack_alert)


TDedupe = TypeVar("TDedupe")
TSchema = TypeVar("TSchema")


def deduplicate(
table: pw.Table[TSchema],
*,
col: pw.ColumnReference,
instance: pw.ColumnReference = None,
acceptor: Callable[[TDedupe, TDedupe], bool],
) -> pw.Table[TSchema]:
"""Deduplicates rows in `table` on `col` column using acceptor function.
It keeps rows for which acceptor returned previous value
Args:
table (pw.Table[TSchema]): table to deduplicate
col (pw.ColumnReference): column used for deduplication
acceptor (Callable[[TDedupe, TDedupe], bool]): callback telling whether two values are different
instance (pw.ColumnReference, optional): Group column for which deduplication will be performed separately.
Defaults to None.
Returns:
pw.Table[TSchema]:
"""
assert col.table == table
assert instance is None or instance.table == table
previous_states: dict[Any, TDedupe | None] = dict()

# keeping state in Python, accessed by non-pure udf function. This is Pathway antipattern.
# todo: refactor once we have proper differentiation operator

@pw.udf
def is_different_with_state(new_state: TDedupe, key: Any) -> bool:
prev_state = previous_states.get(key, None)
if prev_state is None:
previous_states[key] = new_state
return True
are_different = acceptor(new_state, prev_state)
if are_different:
previous_states[key] = new_state
return are_different

return table.filter(is_different_with_state(col, instance) == True) # noqa: E712
8 changes: 8 additions & 0 deletions run_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ def unstructured(**kwargs):
return run(**kwargs)


@cli.command()
@common_options
def alert(**kwargs):
from examples.pipelines.alert import run

return run(**kwargs)


def main():
cli.main()

Expand Down

0 comments on commit 0ca8fb1

Please sign in to comment.