With Snowflake Cortex, we’ve gained highly effective instruments for implementing refined AI options straight inside our knowledge ecosystem. This submit will take you past the fundamentals, demonstrating tips on how to construct a complete resolution that integrates doc AI, LLM Chat, and interactive visualizations — all inside Snowflake.
The answer I’m going to debate right here permits the extraction of PDFs from publicly accessible repositories, offering customers with entry to related particulars in chunks of textual content, together with references to the unique doc places. By using Cortex for knowledge interplay and Doc AI for extracting key metrics in a structured type, the answer effectively processes and inserts knowledge into an in-house repository for numerous enterprise functions. Right here, I’ve used the USDA public dataset to showcase its effectiveness in enhancing knowledge accessibility and interplay.
Resolution Overview
Our resolution consists of 4 elements:
- Net scraping and knowledge ingestion
- Doc AI for structured knowledge extraction
- LLM chat for knowledge interplay
- Streamlit app for visualization and person interplay
Let’s dive into every part. Earlier than that, arrange the community rule and exterior entry integrations which can enable us to entry our supply URLs contained in the Snowflake account.
Configuring Egress Community Guidelines for Information Ingestion in Snowflake
Under community rule (My_outbound_network_rule_pdf) permits outbound community site visitors to specified host and port mixtures. The rule consists of domains like http://usda.library.cornell.edu/ and particular IP addresses with elective ports. This configuration ensures managed exterior entry for knowledge transfers or API calls.
CREATE OR REPLACE NETWORK RULE My_outbound_network_rule_pdf
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('downloads.usda.library.cornell.edu','usda.library.cornell.edu');
Configuring Exterior Entry Integration for Safe Information Retrieval
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION www_access_integration
ALLOWED_NETWORK_RULES = (My_outbound_network_rule_pdf)
ENABLED = true;
- Net Scraping and Information Ingestion
To begin our pipeline, we’ve applied a Python saved process for fetching PDF paperwork from the USDA web site and storing them in a Snowflake inside stage:
This saved process fetches PDF hyperlinks from a given URL and its subsequent pages, downloads these PDFs, and uploads them to a Snowflake stage (@DOCS). It makes use of threading to handle a number of downloads concurrently and ensures that already uploaded PDFs usually are not duplicated.
You will need to set the encryption to ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
for LLM operations
create or exchange stage docs ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE') DIRECTORY = ( ENABLE = true );
CREATE OR REPLACE PROCEDURE usfd_python_file_pro(start_url STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'fetch_pdfs_and_store_pro'
EXTERNAL_ACCESS_INTEGRATIONS = (www_access_integration)
PACKAGES = ('requests', 'beautifulsoup4', 'snowflake-snowpark-python')
AS
$$
import requests
from bs4 import BeautifulSoup
import os
from urllib.parse import urljoin, urlparse
from threading import Thread, Lockdef get_pdf_links(url):
response = requests.get(url)
soup = BeautifulSoup(response.content material, 'html.parser')
pdf_links = [link['href'] for hyperlink in soup.find_all('a', href=True) if hyperlink['href'].decrease().endswith('.pdf')]
return pdf_links
def get_next_page_link(page_url):
response = requests.get(page_url)
soup = BeautifulSoup(response.content material, 'html.parser')
next_page_link = soup.discover('a', rel='subsequent', href=True)
if next_page_link:
next_page_href = next_page_link['href']
if next_page_href.startswith('http'):
return next_page_href
else:
return urljoin(page_url, next_page_href)
return None
def is_file_uploaded(session, filename):
question = f"LIST @DOCS/{filename}"
strive:
consequence = session.execute(question).fetchall()
return len(consequence) > 0
besides Exception as e:
print(f"Error checking file existence: {str(e)}")
return False
def download_and_upload_pdf(session, pdf_url, lock):
pdf_name = os.path.basename(urlparse(pdf_url).path)
strive:
with lock:
if is_file_uploaded(session, pdf_name):
print(f"Skipping already uploaded PDF: {pdf_name}")
return
response = requests.get(pdf_url)
if response.status_code == 200:
print(f"Downloading PDF: {pdf_url}")
tmp_dir = '/tmp/snowflake/'
os.makedirs(tmp_dir, exist_ok=True)
local_file_path = os.path.be part of(tmp_dir, pdf_name)
with open(local_file_path, 'wb') as f:
f.write(response.content material)
stage_path = f"@DOCS/{pdf_name}"
put_result = session.file.put(f"file://{local_file_path}", stage_path, auto_compress=False)
if put_result[0].standing == 'UPLOADED':
print(f"Uploaded PDF: {pdf_name} to Snowflake stage")
else:
print(f"Didn't add PDF: {pdf_name}")
os.take away(local_file_path)
else:
print(f"Didn't obtain PDF from {pdf_url}. Standing code: {response.status_code}")
besides requests.exceptions.RequestException as e:
print(f"Error fetching PDF from URL: {pdf_url} - {str(e)}")
def fetch_pdfs_and_store_pro(session, start_url):
page_url = start_url
lock = Lock()
threads = []
max_threads = 10
whereas page_url:
pdf_links = get_pdf_links(page_url)
print(f"PDF Hyperlinks on {page_url}: {pdf_links}")
for pdf_link in pdf_links:
pdf_url = pdf_link if pdf_link.startswith('http') else urljoin('https://usda.library.cornell.edu', pdf_link)
thread = Thread(goal=download_and_upload_pdf, args=(session, pdf_url, lock))
thread.begin()
threads.append(thread)
if len(threads) >= max_threads:
for thread in threads:
thread.be part of()
threads = []
page_url = get_next_page_link(page_url)
for thread in threads:
thread.be part of()
return "PDF metadata fetching and storing accomplished"
$$;
CALL usfd_python_file_pro('https://usda.library.cornell.edu/concern/publications/j098zb09z?locale=en'); # check Name
2. Setting Up Streams to Seize Adjustments
Subsequent, create a stream (ACRE_PDF_STREAM) on the @DOCS stage to seize adjustments to knowledge, enabling environment friendly monitoring of recent, modified, or deleted recordsdata on the stage. Organising this stream permits for monitoring and processing of PDF recordsdata as they’re added to the DOCS stage.
CREATE or REPLACE STREAM DOCAI_LLM_ML.AI_ML.ACRE_PDF_STREAM ON STAGE DOCS;
3. Key Metrics Extraction Utilizing Doc AI
Create a desk to seize the extracted knowledge
CREATE OR REPLACE TABLE DOCAI_LLM_ML.AI_ML. DOCAI_PDF_PREDICTION (
file_name STRING,
file_size INTEGER,
last_modified TIMESTAMP_NTZ,
snowflake_file_url STRING,
ocrScore FLOAT,
acres_score FLOAT,
acres_value STRING,
states_score FLOAT,
states_value STRING,
CornHarvested_score FLOAT,
CornHarvested_value STRING,
CornPlanted_score FLOAT,
CornPlanted_value STRING,
WheatHarvested_score FLOAT,
WheatHarvested_value STRING,
WheatPlanted_score FLOAT,
WheatPlanted_value STRING
);
The DOCAI_PREDICTION process processes newly inserted PDF recordsdata from the ACRE_PDF_STREAM, extracting related knowledge and predictions utilizing a predictive mannequin. It then inserts this knowledge into the DOCAI_LLM_ML.AI_ML.FARM_DATA desk, together with metadata like file identify, dimension, and prediction scores. Lastly, it counts the entire variety of processed recordsdata and returns this rely as a string.
CREATE OR REPLACE PROCEDURE DOCAI_PREDICTION ()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
file_count INTEGER;
BEGIN
-- Initialize file_count variable to 0
file_count := 0;-- Insert knowledge into the desk
INSERT INTO DOCAI_LLM_ML.AI_ML.FARM_DATA (
file_name,
file_size,
last_modified,
snowflake_file_url,
ocrScore,
acres_score,
acres_value,
states_score,
states_value,
CornHarvested_score,
CornHarvested_value,
CornPlanted_score,
CornPlanted_value,
WheatHarvested_score,
WheatHarvested_value,
WheatPlanted_score,
WheatPlanted_value
)
WITH temp AS (
SELECT
RELATIVE_PATH AS file_name,
SIZE AS file_size,
LAST_MODIFIED::TIMESTAMP_NTZ(9) AS last_modified,
GET_PRESIGNED_URL('@DOCS', RELATIVE_PATH) AS snowflake_file_url,
DOCAI_LLM_ML.AI_ML.DOCAI_PEDICTION_MODEL!PREDICT(GET_PRESIGNED_URL('@DOCS', RELATIVE_PATH), 4) AS json_prediction
FROM ACRE_PDF_STREAM
WHERE METADATA$ACTION = 'INSERT'
)
SELECT
file_name,
file_size,
last_modified,
snowflake_file_url,
json_prediction:__documentMetadata.ocrScore::FLOAT AS ocrScore,
json_prediction:acres[0].rating::FLOAT AS acres_score,
json_prediction:acres[0].worth::STRING AS acres_value,
json_prediction:states[0].rating::FLOAT AS states_score,
json_prediction:states[0].worth::STRING AS states_value,
json_prediction:CornHarvested[0].rating::FLOAT AS CornHarvested_score,
json_prediction:CornHarvested[0].worth::STRING AS CornHarvested_value,
json_prediction:CornPlanted[0].rating::FLOAT AS CornPlanted_score,
json_prediction:CornPlanted[0].worth::STRING AS CornPlanted_value,
json_prediction:WheatHarvested[0].rating::FLOAT AS WheatHarvested_score,
json_prediction:WheatHarvested[0].worth::STRING AS WheatHarvested_value,
json_prediction:WheatPlanted[0].rating::FLOAT AS WheatPlanted_score,
json_prediction:WheatPlanted[0].worth::STRING AS WheatPlanted_value
FROM temp;
-- Set file_count to the variety of distinctive recordsdata processed and inserted
file_count := (SELECT COUNT(*) FROM DOCAI_LLM_ML.AI_ML.DOCAI_PDF_PREDICTION);
-- Return a string with the rely info
RETURN 'Inserted ' || file_count::STRING || ' recordsdata into DOCAI_PDF_PREDICTION desk.';
END;
4. Creating Activity to Materialize the Mannequin Output
Create a process ACRE_PDF_TASK to run the DOCAI_PREDICTION process each 60 minutes. If any new paperwork are recognized within the stream, ingest the prediction knowledge into the DOCAI_PDF_PREDICTION desk.
CREATE OR REPLACE TASK ACRE_PDF_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '60 minutes'
COMMENT = 'Activity to insert predictions into DOCAI_PDF_PREDICTION desk'
AS
CALL ACRE_PDF_TASK();
5. LLM Chat with Information
Processing and Chunking PDF Textual content
This UDF named “pdf_text_chunker” processes PDF recordsdata by extracting textual content and splitting it into chunks. The operate consists of:
read_pdf Technique: Reads the PDF file from Snowflake storage, extracts textual content utilizing PyPDF2, and handles extraction errors with logging.
course of Technique: Splits the extracted textual content into chunks utilizing RecursiveCharacterTextSplitter with a 4000-character dimension and 400-character overlap. Converts the chunks right into a DataFrame and yields them for database insertion.
Significance of Chunking and Overlapping
Chunking: Giant Language Fashions (LLMs) have a restrict on the variety of tokens they will course of without delay. By chunking the textual content, we be sure that the enter dimension stays manageable and inside the mannequin’s limits.
Overlapping: Overlapping ensures that context is preserved throughout chunks. That is essential for sustaining the continuity of knowledge, which is very necessary for fashions that depend on contextual understanding to make correct predictions or generate coherent responses. With out overlapping, the mannequin would possibly lose necessary context on the boundaries of chunks.
create or exchange operate pdf_text_chunker(file_url string)
returns desk (chunk varchar)
language python
runtime_version = '3.9'
handler = 'pdf_text_chunker'
packages = ('snowflake-snowpark-python','PyPDF2', 'langchain')
as
$$
from snowflake.snowpark.sorts import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.recordsdata import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pdclass pdf_text_chunker:
def read_pdf(self, file_url: str) -> str:
logger = logging.getLogger("udf_logger")
logger.information(f"Opening file {file_url}")
with SnowflakeFile.open(file_url, 'rb') as f:
buffer = io.BytesIO(f.readall())
reader = PyPDF2.PdfReader(buffer)
textual content = ""
for web page in reader.pages:
strive:
textual content += web page.extract_text().exchange('n', ' ').exchange(' ', ' ')
besides:
textual content = "Unable to Extract"
logger.warn(f"Unable to extract from file {file_url}, web page {web page}")
return textual content
def course of(self,file_url: str):
textual content = self.read_pdf(file_url)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 4000, #Regulate this as you see match
chunk_overlap = 400, #This let's textual content have some type of overlap. Helpful for conserving chunks contextual
length_function = len
)
chunks = text_splitter.split_text(textual content)
df = pd.DataFrame(chunks, columns=['chunks'])
yield from df.itertuples(index=False, identify=None)
$$;
Create desk to to Retailer the Vector Embeddings
This script creates a desk CHUNK_AI_ML
to retailer metadata, textual content chunks, and vector embeddings of textual content extracted from PDF recordsdata, facilitating environment friendly storage and processing for machine studying purposes in Snowflake.
CREATE OR REPLACE TABLE CHUNK_AI_ML(
RELATIVE_PATH VARCHAR(16777216),
SIZE NUMBER(38,0),
FILE_URL VARCHAR(16777216),
SCOPED_FILE_URL VARCHAR(16777216),
CHUNK VARCHAR(16777216),
CHUNK_VEC VECTOR(FLOAT, 768)
);
This code inserts knowledge into the CHUNK_AI_ML desk by choosing related metadata and textual content chunks from PDF recordsdata saved within the docs stage. It builds scoped URLs for the recordsdata, processes the textual content into chunks utilizing the pdf_text_chunker operate, and generates vector embeddings for every chunk utilizing Snowflake’s EMBED_TEXT_768 operate. This ensures the textual content knowledge is prepared for environment friendly AI/ML duties.
insert into chunk_ai_ml(relative_path, dimension, file_url,
scoped_file_url, chunk, chunk_vec)
choose relative_path,
dimension,
file_url,
build_scoped_file_url(@docs, relative_path) as scoped_file_url,
func.chunk as chunk,
SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2',chunk) as chunk_vec
from
listing(@docs),
TABLE(pdf_text_chunker(build_scoped_file_url(@docs, relative_path))) as func;
Automating Textual content Extraction and Embedding from PDF Information
Create a process named task_extract_chunk_vec_from_pdf that runs each minute if new knowledge is detected within the ACRE_PDF_STREAM. The duty inserts metadata, textual content chunks, and vector embeddings into the CHUNK_AI_ML desk. It selects the required knowledge from the ACRE_PDF_STREAM, processes the PDF recordsdata utilizing the pdf_text_chunker operate created earlier, and generates vector embeddings utilizing the SNOWFLAKE.CORTEX.EMBED_TEXT_768 operate. This automates the extraction, chunking, and embedding of textual content from new PDF recordsdata as they’re added to the stream.
SNOWFLAKE.CORTEX.EMBED_TEXT_768 generates 768-dimensional vector embeddings from textual content. These embeddings are helpful for numerous AI/ML duties, akin to textual content classification, clustering, and similarity search. By remodeling textual content into numerical vectors, the operate permits environment friendly and scalable textual content processing.
Different Choices in Cortex
CORTEX.EMBED_TEXT_512: Generates 512-dimensional vector embeddings, appropriate for purposes the place a smaller vector dimension is ample.
CORTEX.EMBED_TEXT_1024: Gives 1024-dimensional vector embeddings for eventualities requiring increased precision and extra detailed textual content representations.
CORTEX.CLASSIFY_TEXT: Classifies textual content into predefined classes utilizing a educated machine studying mannequin.
CORTEX.DETECT_ENTITIES: Identifies and extracts named entities from textual content, akin to names, dates, and places.
CREATE OR REPLACE TASK task_extract_chunk_vec_from_pdf
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 minute'
WHEN system$stream_has_data('ACRE_PDF_STREAM')
AS
INSERT INTO chunk_ai_ml(relative_path, dimension, file_url,
scoped_file_url, chunk, chunk_vec)
SELECT relative_path,
dimension,
file_url,
build_scoped_file_url(@docs, relative_path) AS scoped_file_url,
func.chunk AS chunk,
SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2', chunk) AS chunk_vec
FROM docs_stream,
TABLE(pdf_text_chunker(build_scoped_file_url(@docs, relative_path))) AS func;
Constructing the RAG (Retrieval-Augmented Technology)
In RAG, a language mannequin (LLM) is used at the side of a retrieval mechanism to supply extra contextually related and correct responses by leveraging exterior knowledge sources. Right here’s a short clarification of the way it works on this context:
Vector Similarity Search:
The get_similar_chunks operate makes use of vector cosine similarity to seek out essentially the most related textual content chunks from the chunk_ai_ml desk primarily based on the person’s query. This step retrieves contextual info from the PDF knowledge saved in Snowflake.
Chat Historical past and Query Summarization:
The get_chat_history operate retrieves the chat historical past to take care of context throughout interactions.
The summarize_question_with_history operate extends the person’s query with related chat historical past to make sure the generated question is contextually enriched.
Immediate Creation:
The create_prompt operate constructs a immediate that features the retrieved textual content chunks (context) and the prolonged query. It ensures that the language mannequin has entry to related info when producing responses.
Response Technology:
The entire operate makes use of the snowflake.cortex.full operate to generate a response primarily based on the constructed immediate. This leverages Snowflake Cortex’s capabilities to supply a context-aware reply.
def get_similar_chunks(query):
cmd = """
WITH outcomes AS (
SELECT RELATIVE_PATH,
VECTOR_COSINE_SIMILARITY(chunk_ai_ml.chunk_vec,
SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2', ?)) as similarity,
chunk
FROM chunk_ai_ml
ORDER BY similarity DESC
LIMIT ?
)
SELECT chunk, relative_path FROM outcomes
"""
df_chunks = session.sql(cmd, params=[question, num_chunks]).to_pandas()
return df_chunksdef get_chat_history():
start_index = max(0, len(st.session_state.messages) - slide_window)
return st.session_state.messages[start_index:-1]
def summarize_question_with_history(chat_history, query):
immediate = f"""
Primarily based on the chat historical past beneath and the query, generate a question that stretch the query
with the chat historical past supplied. The question needs to be in pure language.
Reply with solely the question. Don't add any clarification.
<chat_history>
{chat_history}
</chat_history>
<query>
{query}
</query>
"""
cmd = "SELECT snowflake.cortex.full(?, ?) as response"
df_response = session.sql(cmd, params=[st.session_state.model_name, prompt]).acquire()
abstract = df_response[0].RESPONSE.exchange("'", "")
return abstract
def create_prompt(myquestion):
if st.session_state.use_chat_history:
chat_history = get_chat_history()
if chat_history:
question_summary = summarize_question_with_history(chat_history, myquestion)
df_chunks = get_similar_chunks(question_summary)
else:
df_chunks = get_similar_chunks(myquestion)
else:
df_chunks = get_similar_chunks(myquestion)
chat_history = ""
prompt_context = "".be part of(df_chunks['CHUNK'].exchange("'", ""))
# Print reference recordsdata
st.sidebar.subheader("Reference Information:")
unique_files = set()
for _, row in df_chunks.iterrows():
file_name = row['RELATIVE_PATH']
if file_name not in unique_files:
st.sidebar.write(f"- {file_name}")
unique_files.add(file_name)
immediate = f"""
You're an knowledgeable chat help that extracts info from the CONTEXT supplied
between <context> and </context> tags.
You supply a chat expertise contemplating the knowledge included within the CHAT HISTORY
supplied between <chat_history> and </chat_history> tags.
When answering the query contained between <query> and </query> tags
be concise and don't hallucinate.
If you do not have the knowledge simply say so.
Don't point out the CONTEXT utilized in your reply.
Don't point out the CHAT HISTORY utilized in your reply.
<chat_history>
{chat_history}
</chat_history>
<context>
{prompt_context}
</context>
<query>
{myquestion}
</query>
Reply:
"""
return immediate
def full(myquestion):
immediate = create_prompt(myquestion)
cmd = "SELECT snowflake.cortex.full(?, ?) as response"
df_response = session.sql(cmd, params=[st.session_state.model_name, prompt]).acquire()
return df_response
6. Streamlit App for Visualization and Interplay
This Streamlit utility supplies an interactive interface for customers to discover and analyze structured knowledge extracted utilizing Doc AI. It additionally options an AI-powered chatbot for retrieving related info from inside paperwork and augmenting it with an LLM mannequin, presenting the knowledge in a helpful method. By integrating each structured metrics and chat interactions on the identical knowledge, analysts, enterprise customers, and researchers can work primarily based on a single supply of fact, making their duties sooner and simpler.