Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Debug wait for activity log stored #218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Uh oh!
There was an error while loading. Please reload this page.
Debug wait for activity log stored #218
Changes from all commits
b94592e587653bf1b4bc3287d46a9e662dbe011a15a8ed662d50f995d15ae90455d01fFile filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
There are no files selected for viewing
Check failure on line 676 in nuclia_e2e/nuclia_e2e/tests/test_kb_features.py
test_kb_features.test_kb_features[gke-stage-1]
Raw output
request = <FixtureRequest for <Function test_kb_features[gke-stage-1]>> regional_api_config = ZoneConfig(name='gke-stage-1', zone_slug='europe-1', test_kb_slug='nuclia-e2e-live-europe-1', permanent_nua_key='eyJhb...1f-4197-be76-6fc611082fe8', grafana_url='http://platform.grafana.nuclia.com', tempo_datasource_id='P95F6455D1776E941')) @pytest.mark.asyncio_cooperative async def test_kb_features(request: pytest.FixtureRequest, regional_api_config): """ This test simulates a typical sequence of operations within a KnowledgeBox (KB). Instead of breaking these into separate atomic tests, we run them together to optimize resource usage. Creating a new KB for each atomic test would be inefficient, so this approach allows us to test functionality on a freshly created KB without excessive overhead. The key advantage of testing on a new KB is the ability to catch errors that might not appear when using an existing KB with older parameters. The test consists of both sequential and concurrent operations. Some steps must be executed in sequence to ensure the correct state before proceeding, while others can run in parallel. This is why we utilize gather in certain parts of the test. """ def logger(msg): print(f"{request.node.name} ::: {msg}") zone = regional_api_config.zone_slug auth = get_auth() kb_slug = f"{regional_api_config.test_kb_slug}-test_kb_features" # Make sure the kb used for this test is deleted, as the slug is reused: old_kbid = await get_kbid_from_slug(regional_api_config.zone_slug, kb_slug) if old_kbid is not None: await AsyncNucliaKBS().delete(zone=regional_api_config.zone_slug, id=old_kbid) # Creates a brand new kb that will be used troughout this test kbid = await run_test_kb_creation(regional_api_config, kb_slug, logger) # Configures a nucliadb client defaulting to a specific kb, to be used # to override all the sdk endpoints that automagically creates the client # as this is incompatible with the cooperative tests async_ndb = get_async_kb_ndb_client(zone, kbid, user_token=auth._config.token) # Import a preexisting export containing several resources (coming from the financial-news kb) # and wait for the resources to be completely imported await run_test_import_kb(regional_api_config, async_ndb, logger) # Create a labeller configuration, with the goal of testing two tings: # - labelling of existing resources (the ones imported) # - labelling of new resources(will be created later) # # Add a new embedding model to the KB and start a task to compute all data # with the new embedding model. This will test the data flow between # nucliadb and learning to stream all KB data to reprocess with the new # embedding model and ingest/index in nucliadb again. # We want to do it before upload to validate, on one side the migration # itself, and on the other ingestion in a KB with multiple vectorsets # # Create a labeller configuration that runs only on new resources that have a specific label # This will test the filtering capabilities of the labeller # (_, embedding_migration_task_id, _, ask_task_id) = await asyncio.gather( run_test_create_da_labeller(regional_api_config, async_ndb, logger), run_test_start_embedding_model_migration_task(async_ndb), run_test_create_da_labeller_with_label_filter(regional_api_config, async_ndb, logger), run_test_da_ask_worker(regional_api_config, async_ndb, logger), ) # Upload a new resource and validate that is correctly processed and stored in nuclia # Also check that its index are available, by checking the amount of extracted paragraphs await run_test_upload_and_process(regional_api_config, async_ndb, logger) # Wait for both labeller task results to be consolidated in nucliadb while we also run semantic search # This /find and /ask requests are crafted so they trigger all the existing calls to predict features # We wait until find succeeds to run the ask tests to maximize the chances that all indexes will be # available and so minimize the llm costs retrying await asyncio.gather( run_test_check_da_labeller_output(regional_api_config, async_ndb, logger), run_test_check_embedding_model_migration(async_ndb, embedding_migration_task_id, logger), run_test_find(async_ndb), run_test_graph(async_ndb, kbid), run_test_check_da_labeller_with_label_filter_output(regional_api_config, async_ndb, logger), run_test_check_da_ask_output(regional_api_config, ask_task_id, async_ndb, logger), ) await asyncio.gather( run_test_ask(async_ndb), run_test_ask_query_image(async_ndb), ) # Validate that all the data about the usage we generated is correctly stored on the activity log # and can be queried, and that the remi quality metrics. Even if the remi metrics won't be computed until # the activity log is stored, the test_activity_log tests several things aside the ask events (the ones # affecting the remi queries) and so we can benefit of running them in parallel. > await asyncio.gather( run_test_activity_log(regional_api_config, async_ndb, logger), run_test_remi_query(regional_api_config, async_ndb, logger), ) nuclia_e2e/nuclia_e2e/tests/test_kb_features.py:873: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ regional_api_config = ZoneConfig(name='gke-stage-1', zone_slug='europe-1', test_kb_slug='nuclia-e2e-live-europe-1', permanent_nua_key='eyJhb...1f-4197-be76-6fc611082fe8', grafana_url='http://platform.grafana.nuclia.com', tempo_datasource_id='P95F6455D1776E941')) ndb = <nuclia_e2e.utils.Retriable object at 0x7fd20502d090> logger = <function test_kb_features.<locals>.logger at 0x7fd203bd3370> async def run_test_activity_log(regional_api_config, ndb, logger): kb = AsyncNucliaKB() now = datetime.now(tz=timezone.utc) def activity_log_is_stored(): @wraps(activity_log_is_stored) async def condition() -> tuple[bool, Any]: logs = await kb.logs.query( ndb=ndb, type=EventType.ASK, query=ActivityLogsAskQuery( year_month=f"{now.year}-{now.month:02}", filters=QueryFiltersAsk(), # type: ignore[call-arg] pagination=Pagination(limit=100), ), ) print(f"activity_log_is_stored -----> {len(logs.data)}") if len(logs.data) >= 2: # as the asks may be retried more than once (because some times rephrase doesn't always work) # we need to check the last logs. The way the tests are setup if we reach here is because we # validated that we got the expected results on ask, so the log should match this reasoning. if ( logs.data[-2].question == TEST_CHOCO_QUESTION and logs.data[-1].question == TEST_CHOCO_ASK_MORE ): return (True, logs) from pprint import pprint # noqa: T203,I001,RUF100 print("----------------------------") pprint(logs.data) # noqa: T203,I001,RUF100 print("----------------------------") return (False, None) return condition success, logs = await wait_for(activity_log_is_stored(), max_wait=360, logger=logger) > assert success, "Activity logs didn't get stored in time" E AssertionError: Activity logs didn't get stored in time E assert False nuclia_e2e/nuclia_e2e/tests/test_kb_features.py:676: AssertionErrorUh oh!
There was an error while loading. Please reload this page.