From 58ae587e29f114110824fdc8ea0b398d422bc0da Mon Sep 17 00:00:00 2001 From: maddievn Date: Wed, 21 Feb 2024 13:39:43 +0200 Subject: [PATCH] Switch pg_vector for Elasticsearch for duo --- ...reate_db_embeddings_per_doc_file_worker.rb | 52 +++----------- .../create_embeddings_records_worker.rb | 72 +------------------ ee/lib/gitlab/llm/tanuki_bot.rb | 64 ++++++----------- 3 files changed, 34 insertions(+), 154 deletions(-) diff --git a/ee/app/workers/llm/embedding/gitlab_documentation/create_db_embeddings_per_doc_file_worker.rb b/ee/app/workers/llm/embedding/gitlab_documentation/create_db_embeddings_per_doc_file_worker.rb index 9aa26e9055d1e4..75364cba9ec990 100644 --- a/ee/app/workers/llm/embedding/gitlab_documentation/create_db_embeddings_per_doc_file_worker.rb +++ b/ee/app/workers/llm/embedding/gitlab_documentation/create_db_embeddings_per_doc_file_worker.rb @@ -7,6 +7,7 @@ class CreateDbEmbeddingsPerDocFileWorker include ApplicationWorker include Gitlab::ExclusiveLeaseHelpers include EmbeddingsWorkerContext + include ::Elasticsearch::Model::Client::ClassMethods idempotent! data_consistency :always # rubocop: disable SidekiqLoadBalancing/WorkerDataConsistency @@ -27,60 +28,27 @@ def perform(filename, update_version) content = File.read(filename) source = filename.gsub(Rails.root.to_s, '') - # This worker needs to be idempotent, so that in case of a failure, if this worker is re-run, we make - # sure we do not create duplicate entries for the same file. For that reason, we cleanup any records - # for the passed in filename and given update_version. - file_embeddings = MODEL.select(:id).for_source(source).for_version(update_version) - file_embeddings.each_batch(of: BATCH_SIZE) { |batch| batch.delete_all } - items = ::Gitlab::Llm::Embeddings::Utils::DocsContentParser.parse_and_split(content, source, DOC_DIRECTORY) - # By default VertexAI has 600 requests per minute(i.e. 10 req/sec) quota for embeddings endpoint based on - # https://cloud.google.com/vertex-ai/docs/quotas#request_quotas, - # so let's schedule roughly ~7 jobs per second for now. That is what items.each_slice(EMBEDDINGS_PER_SECOND) - # does here. - # - # We should consider filling a quota increase request when we know more about the overall embeddings usage, - # but we still want these requests throttled. items.each_slice(EMBEDDINGS_PER_SECOND) do |batch| - records = batch.map { |item| build_embedding_record(item) } - bulk_create_records(records) + batch.each { |item| index_item(item) } end end private - attr_reader :update_version, :filename - - def bulk_create_records(records) - embedding_ids = MODEL.bulk_insert!(records, returns: :ids) - logger.info( - structured_payload( - message: 'Creating DB embedding records', - filename: filename, - new_embeddings: embedding_ids, - new_version: update_version - ) - ) + attr_reader :filename - embedding_ids.each do |record_id| - delay = embedding_delay(key: :enqueue_set_embeddings_on_db_record, start_in: 3.minutes, ttl: 3.minutes) - SetEmbeddingsOnTheRecordWorker.perform_in(delay, record_id, update_version) - end - end - - def build_embedding_record(item) - current_time = Time.current - - ::Embedding::Vertex::GitlabDocumentation.new( - created_at: current_time, - updated_at: current_time, - embedding: item[:embedding], + def index_item(item) + body = { metadata: item[:metadata], content: item[:content], url: item[:url], - version: update_version - ) + created_at: Time.current, + updated_at: Time.current + } + + client.index(index: 'gitlab-development-duo', body: body, pipeline: 'elser-generate-embeddings') end end end diff --git a/ee/app/workers/llm/embedding/gitlab_documentation/create_embeddings_records_worker.rb b/ee/app/workers/llm/embedding/gitlab_documentation/create_embeddings_records_worker.rb index bd372de9eea299..81f166eaf644a4 100644 --- a/ee/app/workers/llm/embedding/gitlab_documentation/create_embeddings_records_worker.rb +++ b/ee/app/workers/llm/embedding/gitlab_documentation/create_embeddings_records_worker.rb @@ -20,91 +20,23 @@ def perform return unless ::Feature.enabled?(:ai_duo_chat_switch, type: :ops) return unless ::License.feature_available?(:ai_chat) # license check - embeddings_sources = extract_embedding_sources - files.each do |filename| - content = File.read(filename) - source = filename.gsub(Rails.root.to_s, '') - - next unless embeddable?(content) - - current_md5sum = extract_md5sum(embeddings_sources, source) - - # Create the digest by concatenating the file content and the model used, so that we generate new embeddings - # if either change - new_md5sum = OpenSSL::Digest::SHA256.hexdigest( - content + ::Gitlab::Llm::VertexAi::ModelConfigurations::TextEmbeddings::NAME - ) - - # If the file digest did not change, then there's no need to rebuild its embeddings, just used them as is. - next if new_md5sum == current_md5sum - - CreateDbEmbeddingsPerDocFileWorker.perform_async(filename, update_version) + CreateDbEmbeddingsPerDocFileWorker.new.perform(filename) logger.info( structured_payload( message: 'Enqueued DB embeddings creation', - filename: filename, - new_version: update_version + filename: filename ) ) end - - cleanup_embeddings_for_missing_files(embeddings_sources) end private - def extract_embedding_sources - embeddings_sources = Set.new - select_columns = "distinct version, metadata->>'source' as source, metadata->>'md5sum' as md5sum" - - MODEL.select(select_columns).each_batch do |batch| - data = batch.map do |em| - { version: em.version, source: em.source, md5sum: em.md5sum }.with_indifferent_access - end - - embeddings_sources.merge(data) - end - - embeddings_sources.group_by { |em| em[:source] } - end - - def extract_md5sum(embeddings_sources, source) - embeddings_for_source = embeddings_sources.delete(source) - embedding = embeddings_for_source&.find { |embedding| embedding[:version] == MODEL.current_version } - - embedding&.dig('md5sum') - end - - def embeddable?(content) - return false if content.empty? - return false if content.include?('This document was moved to [another location]') - - true - end - - def cleanup_embeddings_for_missing_files(embeddings_sources) - embeddings_sources.keys.each_slice(20) do |sources| - MODEL.for_sources(sources).each_batch(of: BATCH_SIZE) { |batch| batch.delete_all } - - logger.info( - structured_payload( - message: 'Deleting embeddings for missing files', - filename: sources, - new_version: MODEL.current_version - ) - ) - end - end - def files Dir[Rails.root.join("#{DOC_DIRECTORY}/**/*.md")] end - - def update_version - @update_version ||= MODEL.current_version + 1 - end end end end diff --git a/ee/lib/gitlab/llm/tanuki_bot.rb b/ee/lib/gitlab/llm/tanuki_bot.rb index ff09ffffcd0827..f1a335c416b820 100644 --- a/ee/lib/gitlab/llm/tanuki_bot.rb +++ b/ee/lib/gitlab/llm/tanuki_bot.rb @@ -35,56 +35,20 @@ def initialize(current_user:, question:, logger: nil, tracking_context: {}) def execute(&block) return empty_response unless question.present? - return empty_response unless self.class.enabled_for?(user: current_user) - unless ::Embedding::Vertex::GitlabDocumentation.table_exists? - logger.info_or_debug(current_user, message: "Embeddings database does not exist") - - return unsupported_response - end - - unless ::Embedding::Vertex::GitlabDocumentation.any? - logger.info_or_debug(current_user, message: "Need to query docs but no embeddings are found") - - return empty_response - end - - embedding = embedding_for_question(question) - return empty_response if embedding.nil? - - search_documents = get_nearest_neighbors(embedding) + search_documents = get_nearest_neighbors(question) return empty_response if search_documents.empty? get_completions(search_documents, &block) end - # Note: a Rake task is using this method to extract embeddings for a test fixture. - def embedding_for_question(question) - result = vertex_client.text_embeddings(content: question) - - if !result.success? || !result.has_key?('predictions') - logger.info_or_debug(current_user, message: "Could not generate embeddings", - error: result.dig('error', 'message')) - nil - else - result['predictions'].first&.dig('embeddings', 'values') - end - end - - # Note: a Rake task is using this method to extract embeddings for a test fixture. - def get_nearest_neighbors(embedding) - ::Embedding::Vertex::GitlabDocumentation.current.neighbor_for( - embedding, - limit: RECORD_LIMIT - ).map do |item| - item.metadata['source_url'] = item.url - - content = Gitlab::Llm::Embeddings::Utils::DocsAbsoluteUrlConverter.convert(item.content, item.url) + def get_nearest_neighbors(question) + search_documents(question).map do |item| + source = item['_source'] { - id: item.id, - content: content, - metadata: item.metadata + content: Gitlab::Llm::Embeddings::Utils::DocsAbsoluteUrlConverter.convert(source['content'], source['url']), + metadata: source['metadata'].merge('source_url' => source['url']) } end end @@ -137,6 +101,22 @@ def unsupported_response "Your feedback is welcome.") ) end + + def search_documents(question) + body = { + query: { + text_expansion: { + content_embedding: { + model_id: '.elser_model_2', + model_text: question + } + } + }, + _source: ['content', 'metadata.*', 'url'] + } + + ::Gitlab::Elastic::Helper.default.client.search(index: 'gitlab-development-duo', body: body).dig('hits', 'hits') + end end end end -- GitLab