Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions server/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,12 @@ gem "rexml", "~> 3.4.2"

gem "git", "~> 4.0"

<<<<<<< HEAD
gem "pdf-reader", "~> 2.15"
=======
gem "ruby-openai", "~> 8.3"

gem "paper_trail", "~> 17.0"

gem "tokenizers", "~> 0.6.3"
>>>>>>> 667262992 (fix(CE): fix the issue for token limit solution for firecrawl (#1772))
16 changes: 16 additions & 0 deletions server/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2251,6 +2251,7 @@ GEM
activerecord (>= 5.2)
thor (1.4.0)
timecop (0.9.8)
<<<<<<< HEAD
timeout (0.4.3)
tiny_tds (3.2.0)
bigdecimal (~> 3)
Expand All @@ -2259,6 +2260,20 @@ GEM
tiny_tds (3.2.0-x86_64-linux-gnu)
bigdecimal (~> 3)
traces (0.15.2)
=======
timeout (0.6.1)
tiny_tds (3.4.0)
bigdecimal (>= 2.0.0)
tiny_tds (3.4.0-aarch64-linux-gnu)
bigdecimal (>= 2.0.0)
tiny_tds (3.4.0-x86_64-linux-gnu)
bigdecimal (>= 2.0.0)
tokenizers (0.6.3-aarch64-linux)
tokenizers (0.6.3-arm64-darwin)
tokenizers (0.6.3-x86_64-darwin)
tokenizers (0.6.3-x86_64-linux)
traces (0.18.2)
>>>>>>> 667262992 (fix(CE): fix the issue for token limit solution for firecrawl (#1772))
track_open_instances (0.1.15)
trailblazer-option (0.1.2)
ttfunk (1.8.0)
Expand Down Expand Up @@ -2369,6 +2384,7 @@ DEPENDENCIES
strong_migrations
temporal-ruby!
timecop
tokenizers (~> 0.6.3)
tzinfo-data
webrick
xmlrpc (~> 0.3.3)
Expand Down
58 changes: 58 additions & 0 deletions server/lib/reverse_etl/extractors/web_scraping.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

module ReverseEtl
module Extractors
class ChunkProcessingError < StandardError; end

class WebScraping < Base
include ::Utils::Constants
# TODO: Make it as class method
def read(sync_run_id, activity)
sync_run = SyncRun.find(sync_run_id)
Expand All @@ -21,6 +24,33 @@ def read(sync_run_id, activity)

private

def generate_chunk_config(sync_run)
chunk_config = { chunk_size: 1000, chunk_overlap: 200 }
mappings = sync_run.sync.configuration
vector_mappings = mappings.select { |mapping| mapping["mapping_type"] == "vector" }
unless vector_mappings.empty?
# Get the vector config with the smallest token limit
vector_config = vector_mappings
.select { |mapping| EMBEDDING_MODEL_TOKEN_LIMITS.key?(mapping["embedding_config"]["model"]) }
.min_by { |mapping| EMBEDDING_MODEL_TOKEN_LIMITS[mapping["embedding_config"]["model"]] }
unless vector_config.nil?
chunk_config = {
model: vector_config["embedding_config"]["model"],
provider: vector_config["embedding_config"]["mode"],
chunk_size: EMBEDDING_MODEL_TOKEN_LIMITS[vector_config["embedding_config"]["model"]]
}
end
end
chunk_config
end

def generate_chunks(sync_run, markdown_content)
chunk_config = generate_chunk_config(sync_run)
ReverseEtl::Processors::Text::ChunkProcessor.new.process(chunk_config, markdown_content)
rescue StandardError => e
raise ReverseEtl::Extractors::ChunkProcessingError, "Failed to process file content: #{e.message}"
end

def fetch_records(sync_run)
source_client = setup_source_client(sync_run.sync)
result = source_client.read(sync_run.sync.to_protocol)
Expand All @@ -44,16 +74,44 @@ def process_result(result, sync_run)
model = sync_run.sync.model
result.each do |res|
record = res.record
<<<<<<< HEAD
fingerprint = generate_fingerprint(record.data)
sync_record = process_record(record, sync_run, model)
skipped_rows += update_or_create_sync_record(sync_record, record, sync_run, fingerprint) ? 0 : 1
=======
chunk_records = generate_chunks(sync_run, record.data[:markdown])
chunk_records.map do |chunk_record|
new_record = build_record(chunk_record, record.data[:metadata])
fingerprint = generate_fingerprint(new_record.data)
sync_record = process_record(new_record, sync_run, model)
total_query_rows += 1
skipped_rows += update_or_create_sync_record(sync_record, new_record, sync_run, fingerprint) ? 0 : 1
end
>>>>>>> 667262992 (fix(CE): fix the issue for token limit solution for firecrawl (#1772))
end
sync_run.update(
current_offset: 0,
total_query_rows: result.size,
skipped_rows:
)
end
<<<<<<< HEAD
=======

def build_record(message, metadata)
record_data = message.with_indifferent_access
# Used for structured purposes when passing data to process_record
Multiwoven::Integrations::Protocol::RecordMessage.new(
data: {
markdown: record_data["text"],
markdown_hash: record_data["element_id"],
metadata:,
url: JSON.parse(metadata)["url"]
},
emitted_at: Time.zone.now.to_i
)
end
>>>>>>> 667262992 (fix(CE): fix the issue for token limit solution for firecrawl (#1772))
end
end
end
60 changes: 60 additions & 0 deletions server/lib/reverse_etl/processors/text/chunk_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

module ReverseEtl
module Processors
module Text
class ChunkProcessor
DEFAULT_CHUNK_SIZE = 1000
DEFAULT_CHUNK_OVERLAP = 200

def chunk_processor(chunk_config)
if legacy?(chunk_config)
processor_type = ENV["CHUNK_PROCESSOR"] || "langchain_rb"
processor_class_name = "ReverseEtl::Processors::Text::#{processor_type.camelize}"
processor_class_name.constantize.new
else
ReverseEtl::Processors::Text::TokenChunker.new
end
end

# Process content into chunks with metadata
def process(chunk_config, content, metadata = {})
chunk_config[:chunk_size] ||= DEFAULT_CHUNK_SIZE
chunk_config[:chunk_overlap] ||= DEFAULT_CHUNK_OVERLAP
content = cleanup(content)

chunks = chunk_processor(chunk_config).process(chunk_config, content)
format_chunks(chunks, metadata)
end

private

def format_chunks(chunks, metadata)
chunks.map do |chunk|
{
element_id: Digest::MD5.hexdigest(chunk),
text: chunk,
created_date: metadata[:file_created_date],
modified_date: metadata[:file_modified_date],
filename: metadata[:file_name],
filetype: metadata[:file_type],
created_at: Time.current
}
end
end

def cleanup(text)
text
.gsub(/<[^>]+>/, " ") # Remove HTML tags
.gsub(/\s+/, " ")
.gsub(/\\\[\\\n/, " ") # Collapse whitespace and newlines
.strip # Trim leading/trailing spaces
end

def legacy?(chunk_config)
chunk_config[:model].blank? && chunk_config[:provider].blank?
end
end
end
end
end
60 changes: 60 additions & 0 deletions server/lib/reverse_etl/processors/text/token_chunker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

module ReverseEtl
module Processors
module Text
class TokenChunker < BaseDocProcessor
include ::Utils::Constants
def process(chunk_config, content)
@model = chunk_config[:model]
@provider = chunk_config[:provider]
@chunk_size = chunk_config[:chunk_size]
validate_model
initialize_tokeniser
tokens = get_tokens(content)
tokens_to_text_chunks(tokens, @chunk_size)
end

private

def open_ai?
@provider == "open_ai"
end

def tokens_to_text_chunks(arr, chunk_size)
arr.each_slice(chunk_size).map do |chunk|
if open_ai?
@tokeniser.decode(chunk)
else
@tokeniser.decode(chunk).join(" ")
end
end
end

def validate_model
raise TypeError, "Model is required" if @model.blank?
raise TypeError, "Embedding model #{@model} not supported" unless EMBEDDING_MODEL_TOKEN_LIMITS.key?(@model)
end

def initialize_tokeniser
raise TypeError, "Provider is required" if @provider.blank?
raise TypeError, "Provider #{@provider} not supported" unless SUPPORTED_PROVIDERS.include?(@provider)

@tokeniser = if open_ai?
Tiktoken.encoding_for_model(@model)
else
Tokenizers::Tokenizer.from_pretrained("sentence-transformers/#{@model}")
end
end

def get_tokens(text)
if open_ai?
@tokeniser.encode(text)
else
@tokeniser.encode(text).tokens
end
end
end
end
end
end
15 changes: 14 additions & 1 deletion server/lib/utils/constants.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

module Utils
module Constants
module Constants # rubocop:disable Metrics/ModuleLength
# whenever resource group mappung is updated
# enterpise role contracts need to be updated also
RESOURCE_GROUP_MAPPING = {
Expand Down Expand Up @@ -45,5 +45,18 @@ module Constants
description: "Manage and access the AI assistant and its configurations"
}
}.freeze

EMBEDDING_MODEL_TOKEN_LIMITS = {
"text-embedding-3-small" => 8191,
"text-embedding-3-large" => 8191,
"text-embedding-ada-002" => 8191,
"paraphrase-MiniLM-L12-v2" => 128,
"all-MiniLM-L6-v2" => 256,
"multi-qa-MiniLM-L6-cos-v1" => 512,
"all-mpnet-base-v2" => 384,
"msmarco-MiniLM-L6-cos-v5" => 512
}.freeze

SUPPORTED_PROVIDERS = %w[open_ai hugging_face].freeze
end
end
Loading
Loading