Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion nuclia_e2e/nuclia_e2e/tests/test_kb_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@
pagination=Pagination(limit=100),
),
)
print(f"activity_log_is_stored -----> {len(logs.data)}")
if len(logs.data) >= 2:
# as the asks may be retried more than once (because some times rephrase doesn't always work)
# we need to check the last logs. The way the tests are setup if we reach here is because we
Expand All @@ -662,12 +663,17 @@
and logs.data[-1].question == TEST_CHOCO_ASK_MORE
):
return (True, logs)
from pprint import pprint # noqa: T203,I001,RUF100

print("----------------------------")
pprint(logs.data) # noqa: T203,I001,RUF100
print("----------------------------")
return (False, None)

return condition

success, logs = await wait_for(activity_log_is_stored(), max_wait=180, logger=logger)
success, logs = await wait_for(activity_log_is_stored(), max_wait=360, logger=logger)
assert success, "Activity logs didn't get stored in time"

Check failure on line 676 in nuclia_e2e/nuclia_e2e/tests/test_kb_features.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_kb_features.test_kb_features[gke-stage-1]

AssertionError: Activity logs didn't get stored in time assert False
Raw output
request = <FixtureRequest for <Function test_kb_features[gke-stage-1]>>
regional_api_config = ZoneConfig(name='gke-stage-1', zone_slug='europe-1', test_kb_slug='nuclia-e2e-live-europe-1', permanent_nua_key='eyJhb...1f-4197-be76-6fc611082fe8', grafana_url='http://platform.grafana.nuclia.com', tempo_datasource_id='P95F6455D1776E941'))

    @pytest.mark.asyncio_cooperative
    async def test_kb_features(request: pytest.FixtureRequest, regional_api_config):
        """
        This test simulates a typical sequence of operations within a KnowledgeBox (KB).
    
        Instead of breaking these into separate atomic tests, we run them together to optimize resource usage.
        Creating a new KB for each atomic test would be inefficient, so this approach allows us to test
        functionality on a freshly created KB without excessive overhead. The key advantage of testing on a new
        KB is the ability to catch errors that might not appear when using an existing KB with older parameters.
    
        The test consists of both sequential and concurrent operations. Some steps must be executed in sequence
        to ensure the correct state before proceeding, while others can run in parallel. This is why we utilize
        gather in certain parts of the test.
        """
    
        def logger(msg):
            print(f"{request.node.name} ::: {msg}")
    
        zone = regional_api_config.zone_slug
        auth = get_auth()
        kb_slug = f"{regional_api_config.test_kb_slug}-test_kb_features"
    
        # Make sure the kb used for this test is deleted, as the slug is reused:
        old_kbid = await get_kbid_from_slug(regional_api_config.zone_slug, kb_slug)
        if old_kbid is not None:
            await AsyncNucliaKBS().delete(zone=regional_api_config.zone_slug, id=old_kbid)
    
        # Creates a brand new kb that will be used troughout this test
        kbid = await run_test_kb_creation(regional_api_config, kb_slug, logger)
    
        # Configures a nucliadb client defaulting to a specific kb, to be used
        # to override all the sdk endpoints that automagically creates the client
        # as this is incompatible with the cooperative tests
        async_ndb = get_async_kb_ndb_client(zone, kbid, user_token=auth._config.token)
    
        # Import a preexisting export containing several resources (coming from the financial-news kb)
        # and wait for the resources to be completely imported
        await run_test_import_kb(regional_api_config, async_ndb, logger)
    
        # Create a labeller configuration, with the goal of testing two tings:
        # - labelling of existing resources (the ones imported)
        # - labelling of new resources(will be created later)
        #
        # Add a new embedding model to the KB and start a task to compute all data
        # with the new embedding model. This will test the data flow between
        # nucliadb and learning to stream all KB data to reprocess with the new
        # embedding model and ingest/index in nucliadb again.
        # We want to do it before upload to validate, on one side the migration
        # itself, and on the other ingestion in a KB with multiple vectorsets
        #
        # Create a labeller configuration that runs only on new resources that have a specific label
        # This will test the filtering capabilities of the labeller
        #
        (_, embedding_migration_task_id, _, ask_task_id) = await asyncio.gather(
            run_test_create_da_labeller(regional_api_config, async_ndb, logger),
            run_test_start_embedding_model_migration_task(async_ndb),
            run_test_create_da_labeller_with_label_filter(regional_api_config, async_ndb, logger),
            run_test_da_ask_worker(regional_api_config, async_ndb, logger),
        )
    
        # Upload a new resource and validate that is correctly processed and stored in nuclia
        # Also check that its index are available, by checking the amount of extracted paragraphs
        await run_test_upload_and_process(regional_api_config, async_ndb, logger)
    
        # Wait for both labeller task results to be consolidated in nucliadb while we also run semantic search
        # This /find and /ask requests are crafted so they trigger all the existing calls to predict features
        # We wait until find succeeds to run the ask tests to maximize the chances that all indexes will be
        # available and so minimize the llm costs retrying
        await asyncio.gather(
            run_test_check_da_labeller_output(regional_api_config, async_ndb, logger),
            run_test_check_embedding_model_migration(async_ndb, embedding_migration_task_id, logger),
            run_test_find(async_ndb),
            run_test_graph(async_ndb, kbid),
            run_test_check_da_labeller_with_label_filter_output(regional_api_config, async_ndb, logger),
            run_test_check_da_ask_output(regional_api_config, ask_task_id, async_ndb, logger),
        )
        await asyncio.gather(
            run_test_ask(async_ndb),
            run_test_ask_query_image(async_ndb),
        )
    
        # Validate that all the data about the usage we generated is correctly stored on the activity log
        # and can be queried, and that the remi quality metrics. Even if the remi metrics won't be computed until
        # the activity log is stored, the test_activity_log tests several things aside the ask events (the ones
        # affecting the remi queries) and so we can benefit of running them in parallel.
>       await asyncio.gather(
            run_test_activity_log(regional_api_config, async_ndb, logger),
            run_test_remi_query(regional_api_config, async_ndb, logger),
        )

nuclia_e2e/nuclia_e2e/tests/test_kb_features.py:873: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

regional_api_config = ZoneConfig(name='gke-stage-1', zone_slug='europe-1', test_kb_slug='nuclia-e2e-live-europe-1', permanent_nua_key='eyJhb...1f-4197-be76-6fc611082fe8', grafana_url='http://platform.grafana.nuclia.com', tempo_datasource_id='P95F6455D1776E941'))
ndb = <nuclia_e2e.utils.Retriable object at 0x7fd20502d090>
logger = <function test_kb_features.<locals>.logger at 0x7fd203bd3370>

    async def run_test_activity_log(regional_api_config, ndb, logger):
        kb = AsyncNucliaKB()
        now = datetime.now(tz=timezone.utc)
    
        def activity_log_is_stored():
            @wraps(activity_log_is_stored)
            async def condition() -> tuple[bool, Any]:
                logs = await kb.logs.query(
                    ndb=ndb,
                    type=EventType.ASK,
                    query=ActivityLogsAskQuery(
                        year_month=f"{now.year}-{now.month:02}",
                        filters=QueryFiltersAsk(),  # type: ignore[call-arg]
                        pagination=Pagination(limit=100),
                    ),
                )
                print(f"activity_log_is_stored -----> {len(logs.data)}")
                if len(logs.data) >= 2:
                    # as the asks may be retried more than once (because some times rephrase doesn't always work)
                    # we need to check the last logs. The way the tests are setup if we reach here is because we
                    # validated that we got the expected results on ask, so the log should match this reasoning.
                    if (
                        logs.data[-2].question == TEST_CHOCO_QUESTION
                        and logs.data[-1].question == TEST_CHOCO_ASK_MORE
                    ):
                        return (True, logs)
                    from pprint import pprint  # noqa: T203,I001,RUF100
    
                    print("----------------------------")
                    pprint(logs.data)  # noqa: T203,I001,RUF100
                    print("----------------------------")
                return (False, None)
    
            return condition
    
        success, logs = await wait_for(activity_log_is_stored(), max_wait=360, logger=logger)
>       assert success, "Activity logs didn't get stored in time"
E       AssertionError: Activity logs didn't get stored in time
E       assert False

nuclia_e2e/nuclia_e2e/tests/test_kb_features.py:676: AssertionError

# if we have the ask events, we'll must have the find ones, as they have been done earlier.
logs = await kb.logs.query(
Expand Down
Loading