Developing AI Based Search Engine

Let’s see how to build a complete search engine on top of Kubernetes with AI personalized results

Developing AI Based Search Engine
Complete pipeline executed by Kubeflow, responsible for orchestrating the whole system. Image by author.

1. You Know, For Search

Developing AI Based Search Engine 2
Example of document as uploaded to Elasticsearch. Image by author.
Developing AI Based Search Engine 3
Example of matching between search terms and document fields. Image by author.
Developing AI Based Search Engine 4
Example of properly ranked results as retrieved by Elasticsearch running BM25 scoring among the stored documents in the database. Image by author.
Developing AI Based Search Engine 5
Example adding performance metrics layer in retrieval rules. Previous second-best document raises to the top of the results. Image by author.

2. The Machine Learning Layer

Developing AI Based Search Engine 6

Developing AI Based Search Engine 7
Example of Graphical Model implemented on pyClickModels for finding relevance of documents associated to search engines results. Image by author.
Developing AI Based Search Engine 8
Example of input file for the training step as required by Elasticsearch Learn2Rank plugin. Image by author.

2.1 The Valuation Framework

Developing AI Based Search Engine 9

Developing AI Based Search Engine 10
Example of the validation framework in practice. For each user in dataset, we use their search term to retrieve from ES results with the ranking model already implemented. We then take the average rank of the purchased items by users and their position in the ES result. Image by author.

3. Kubeflow Orchestration

@dsl.pipeline()
def build_pipeline(
    bucket='pysearchml',
    es_host='elasticsearch.elastic-system.svc.cluster.local:9200',
    force_restart=False,
    train_init_date='20160801',
    train_end_date='20160801',
    validation_init_date='20160802',
    validation_end_date='20160802',
    test_init_date='20160803',
    test_end_date='20160803',
    model_name='lambdamart0',
    ranker='lambdamart',
    index='pysearchml'
):
    pvc = dsl.PipelineVolume(pvc='pysearchml-nfs')

    prepare_op = dsl.ContainerOp(
        name='prepare env',
        image=f'gcr.io/{PROJECT_ID}/prepare_env',
        arguments=[f'--force_restart={force_restart}', f'--es_host={es_host}', f'--bucket={bucket}', f'--model_name={model_name}'],
        pvolumes={'/data': pvc}
    )

    val_reg_dataset_op = dsl.ContainerOp(
        name='validation regular dataset',
        image=f'gcr.io/{PROJECT_ID}/data_validation',
        arguments=[f'--bucket={bucket}/validation/regular', f'--validation_init_date={validation_init_date}', f'--validation_end_date={validation_end_date}', f'--destination=/data/pysearchml/{model_name}/validation_regular'],
        pvolumes={'/data': pvc}
    ).set_display_name('Build Regular Validation Dataset').after(prepare_op)

    val_train_dataset_op = dsl.ContainerOp(
        name='validation train dataset',
        image=f'gcr.io/{PROJECT_ID}/data_validation',
        arguments=[f'--bucket={bucket}/validation/train', f'--validation_init_date={train_init_date}', f'--validation_end_date={train_end_date}', f'--destination=/data/pysearchml/{model_name}/validation_train'],
        pvolumes={'/data': pvc}
    ).set_display_name('Build Train Validation Dataset').after(prepare_op)

    val_test_dataset_op = dsl.ContainerOp(
        name='validation test dataset',
        image=f'gcr.io/{PROJECT_ID}/data_validation',
        arguments=[f'--bucket={bucket}/validation/test', f'--validation_init_date={test_init_date}', f'--validation_end_date={test_end_date}', f'--destination=/data/pysearchml/{model_name}/validation_test'],
        pvolumes={'/data': pvc}
    ).set_display_name('Build Test Validation Dataset').after(prepare_op)

    train_dataset_op = dsl.ContainerOp(
        name='train dataset',
        image=f'gcr.io/{PROJECT_ID}/data_train',
        command=['python', '/train/run.py'],
        arguments=[f'--bucket={bucket}', f'--train_init_date={train_init_date}', f'--train_end_date={train_end_date}', f'--es_host={es_host}', f'--model_name={model_name}', f'--index={index}', f'--destination=/data/pysearchml/{model_name}/train'],
        pvolumes={'/data': pvc}
    ).set_display_name('Build Training Dataset').after(prepare_op)

    katib_op = dsl.ContainerOp(
        name='pySearchML Bayesian Optimization Model',
        image=f'gcr.io/{PROJECT_ID}/model',
        command=['python', '/model/launch_katib.py'],
        arguments=[f'--es_host={es_host}', f'--model_name={model_name}', f'--ranker={ranker}', '--name=pysearchml', f'--train_file_path=/data/pysearchml/{model_name}/train/train_dataset.txt', f'--validation_files_path=/data/pysearchml/{model_name}/validation_regular', '--validation_train_files_path=/data/pysearchml/{model_name}/validation_train', f'--destination=/data/pysearchml/{model_name}/'],
        pvolumes={'/data': pvc}
    ).set_display_name('Katib Optimization Process').after(
        val_reg_dataset_op, val_train_dataset_op, val_test_dataset_op, train_dataset_op
    )

    post_model_op = dsl.ContainerOp(
        name='Post Best RankLib Model to ES',
        image=f'gcr.io/{PROJECT_ID}/model',
        command=['python', '/model/post_model.py'],
        arguments=[f'--es_host={es_host}', f'--model_name={model_name}', f'--destination=/data/pysearchml/{model_name}/best_model.txt'],
        pvolumes={'/data': pvc}
    ).set_display_name('Post RankLib Model to ES').after(katib_op)

    _ = dsl.ContainerOp(
        name='Test Model',
        image=f'gcr.io/{PROJECT_ID}/model',
        command=['python', '/model/test.py'],
        arguments=[f'--files_path=/data/pysearchml/{model_name}/validation_test', f'--index={index}', f'--es_host={es_host}', f'--model_name={model_name}'],
        pvolumes={'/data': pvc}
    ).set_display_name('Run Test Step').after(post_model_op)
Developing AI Based Search Engine 11
Step-by-step as implemented in pySearchML. Image by author.

1. prepare_env

prepare_op = dsl.ContainerOp(
    name='prepare env',
    image=f'gcr.io/{PROJECT_ID}/prepare_env',
    arguments=[f'--force_restart={force_restart}', f'--es_host={es_host}', f'--bucket={bucket}', f'--model_name={model_name}'],
    pvolumes={'/data': pvc}
)

Developing AI Based Search Engine 12

FROM python:3.7.7-alpine3.12 as python

COPY kubeflow/components/prepare_env /prepare_env
WORKDIR /prepare_env
COPY ./key.json .

ENV GOOGLE_APPLICATION_CREDENTIALS=./key.json

RUN pip install -r requirements.txt

ENTRYPOINT ["python", "run.py"]

Developing AI Based Search Engine 13

{
   "query": {
       "bool": {
           "minimum_should_match": 1,
           "should": [
               {
                   "match": {
                       "name": "{{search_term}}"
                   }
               }
           ]
       }
    },
    "params": ["search_term"],
    "name": "BM25 name"
}
{
    "query": {
        "function_score": {
            "query": {
                "match_all": {}
            },
            "script_score" : {
                "script" : {
                    "params": {
                        "channel_group": "{{channel_group}}"
                    },
                    "source": "if (params.channel_group == 'paid_search') { return doc['performances.channel.paid_search.CTR'].value * 10 } else if (params.channel_group == 'referral') { return doc['performances.channel.referral.CTR'].value * 10 } else if (params.channel_group == 'organic_search') { return doc['performances.channel.organic_search.CTR'].value * 10 } else if (params.channel_group == 'social') { return doc['performances.channel.social.CTR'].value * 10 } else if (params.channel_group == 'display') { return doc['performances.channel.display.CTR'].value * 10 } else if (params.channel_group == 'direct') { return doc['performances.channel.direct.CTR'].value * 10 } else if (params.channel_group == 'affiliates') { return doc['performances.channel.affiliates.CTR'].value * 10 }"
                }
            }
        }
    },
    "params": ["channel_group"],
    "name": "channel_group"
}
{
    "query": {
        "function_score": {
            "query": {
                "match_all": {}
            },
            "script_score" : {
                "script" : {
                    "params": {
                        "customer_avg_ticket": "{{customer_avg_ticket}}"
                    },
                    "source": "return Math.log(1 + Math.abs(doc['price'].value - Float.parseFloat(params.customer_avg_ticket)))"
                }
            }
        }
    },
    "params": ["customer_avg_ticket"],
    "name": "customer_avg_ticket"
}

2. Validation Datasets

{
  "search_keys": {
    "search_term": "office",
    "channel_group": "direct",
    "customer_avg_ticket": "13"
  },
  "docs": [
    {"purchased":["GGOEGOAQ012899"]}
  ]
}

3. Training Dataset

{
  "search_keys": {
    "search_term": "drinkware",
    "channel_group": "direct",
    "customer_avg_ticket": "20"
  },
  "judgment_keys": [
    {
      "session": 
        [
          {"doc":"GGOEGDHC017999","click":"0","purchase":"0"},
          {"doc":"GGOEADHB014799","click":"0","purchase":"0"},
          {"doc":"GGOEGDHQ015399","click":"1","purchase":"0"}
        ]
    }
  ]
}
{
  "search_term: bags|channel_group:organic_search|customer_avg_ticket:30": {
    "GGOEGBRJ037299": 0.3333377540111542,
    "GGOEGBRA037499": 0.222222238779068,
    "GGOEGBRJ037399": 0.222222238779068
  }
}
def build_judgment_files(model_name: str) -> None:
    model = DBN.DBNModel()
    clickstream_files_path = f'/tmp/pysearchml/{model_name}/clickstream/'
    model_path = f'/tmp/pysearchml/{model_name}/model/model.gz'
    rmtree(os.path.dirname(model_path), ignore_errors=True)
    os.makedirs(os.path.dirname(model_path))
    judgment_files_path = f'/tmp/pysearchml/{model_name}/judgments/judgments.gz'
    rmtree(os.path.dirname(judgment_files_path), ignore_errors=True)
    os.makedirs(os.path.dirname(judgment_files_path))

    model.fit(clickstream_files_path, iters=10)
    model.export_judgments(model_path)

    with gzip.GzipFile(judgment_files_path, 'wb') as f:
        for row in gzip.GzipFile(model_path):
            row = json.loads(row)
            result = []
            search_keys = list(row.keys())[0]
            docs_judgments = row[search_keys]
            search_keys = dict(e.split(':') for e in search_keys.split('|'))
            judgments_list = [judge for doc, judge in docs_judgments.items()]

            if all(x == judgments_list[0] for x in judgments_list):
                continue

            percentiles = np.percentile(judgments_list, [20, 40, 60, 80, 100])

            judgment_keys = [{'doc': doc, 'judgment': process_judgment(percentiles, judgment)}
                             for doc, judgment in docs_judgments.items()]
            result = {'search_keys': search_keys, 'judgment_keys': judgment_keys}
            f.write(json.dumps(result).encode() + '\n'.encode())

            
def process_judgment(percentiles: list, judgment: float) -> int:
    if judgment <= percentiles[0]:
        return 0
    if judgment <= percentiles[1]:
        return 1
    if judgment <= percentiles[2]:
        return 2
    if judgment <= percentiles[3]:
        return 3
    if judgment <= percentiles[4]:
        return 4
{
  "search_keys": {
    "search_term": "office",
    "channel_group": "organic_search",
    "customer_avg_ticket": "24"
  },
  "judgment_keys": [
    {"doc": "0", "judgment": 0},
    {"doc": "GGOEGAAX0081", "judgment": 4},
    {"doc": "GGOEGOAB016099", "judgment": 0}
  ]
}
0	qid:0	1:3.1791792	2:0	3:0.0	4:2.3481672
4	qid:0	1:3.0485907	2:0	3:0.0	4:2.3481672
0	qid:0	1:3.048304	2:0	3:0.0	4:0
0	qid:0	1:2.9526825	2:0	3:0.0	4:0
4	qid:1	1:2.7752903	2:0	3:0.0	4:3.61228
0	qid:1	1:2.8348017	2:0	3:0.0	4:2.3481672

4. Katib Optimization

Developing AI Based Search Engine 14
Example of the algorithm Bayesian Optimization. As it samples more data points from the allowed domain, the closer it may get to the optimum value of a given function. In pySearchML, the domain is a set of variables that sets how a ranker should fit the data and the cost function it’s optimizing is the average rank. Image taken from Wikimedia Foundation.

Developing AI Based Search Engine 15

{
  "apiVersion": "kubeflow.org/v1alpha3",
  "kind": "Experiment",
  "metadata": {
    "namespace": "kubeflow",
    "name": "",
    "labels": {
      "controller-tools.k8s.io": "1.0"
    }
  },
  "spec": {
    "objective": {
      "type": "minimize",
      "objectiveMetricName": "Validation-rank",
      "additionalMetricNames": [
        "rank"
      ]
    },
    "algorithm": {
      "algorithmName": "bayesianoptimization"
    },
    "parallelTrialCount": 1,
    "maxTrialCount": 2,
    "maxFailedTrialCount": 1,
    "parameters": [],
    "trialTemplate": {
      "goTemplate": {
        "rawTemplate": {
          "apiVersion": "batch/v1",
          "kind": "Job",
          "metadata":{
            "name": "{{.Trial}}",
            "namespace": "{{.NameSpace}}"
          },
          "spec": {
            "template": {
              "spec": {
                "restartPolicy": "Never",
                "containers": [
                  {
                    "name": "{{.Trial}}",
                    "image": "gcr.io/{PROJECT_ID}/model",
                    "command": [
                      "python /model/train.py --train_file_path={train_file_path} --validation_files_path={validation_files_path} --validation_train_files_path={validation_train_files_path} --es_host={es_host} --destination={destination} --model_name={model_name} --ranker={ranker} {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
                    ],
                    "volumeMounts": [
                      {
                        "mountPath": "/data",
                        "name": "pysearchmlpvc",
                        "readOnly": false
                      }
                    ]
                  }
                ],
                "volumes": [
                  {
                    "name": "pysearchmlpvc",
                    "persistentVolumeClaim": {
                      "claimName": "pysearchml-nfs",
                      "readOnly": false
                    }
                  }
                ]
              }
            }
          }
        }
      }
    }
  }
}
def get_ranker_parameters(ranker: str) -> List[Dict[str, Any]]:
    return {
        'lambdamart': [
            {"name": "--tree", "parameterType": "int", "feasibleSpace": {"min": "1", "max": "500"}},
            {"name": "--leaf", "parameterType": "int", "feasibleSpace": {"min": "2", "max": "40"}},
            {"name": "--shrinkage", "parameterType": "double", "feasibleSpace": {"min": "0.01", "max": "0.2"}},
            {"name": "--tc", "parameterType": "int", "feasibleSpace": {"min": "-1", "max": "300"}},
            {"name": "--mls", "parameterType": "int", "feasibleSpace": {"min": "1", "max": "10"}}
        ]
    }.get(ranker)
cmd = ('java -jar ranklib/RankLib-2.14.jar -ranker '
       f'{ranker} -train {args.train_file_path} -norm sum -save '
       f'{args.destination}/model.txt '
       f'{(" ".join(X)).replace("--", "-").replace("=", " ")} -metric2t ERR')
{
    "query": {
        "function_score": {
            "query": {
                "bool": {
                    "must": {
                        "bool": {
                            "minimum_should_match": 1,
                            "should": [
                                {
                                    "multi_match": {
                                        "operator": "and",
                                        "query": "{query}",
                                        "type": "cross_fields",
                                        "fields": [
                                            "sku",
                                            "name",
                                            "category"
                                        ]
                                    }
                                }
                            ]
                        }
                    }
                }
            },
            "functions": [
                {
                    "field_value_factor": {
                        "field": "performances.global.CTR",
                        "factor": 10,
                        "missing": 0,
                        "modifier": "none"
                    }
                }
            ],
            "boost_mode": "sum",
            "score_mode": "sum"
        }
    },
    "rescore": {
        "window_size": "{window_size}",
        "query": {
            "rescore_query": {
                "sltr": {
                    "params": "{search_keys}",
                    "model": "{model_name}"
                }
            },
            "rescore_query_weight": 20,
            "query_weight": 0.1,
            "score_mode": "total"
        }
    }
}
def get_es_query(
    search_keys: Dict[str, Any],
    model_name: str,
    es_batch: int = 1000
) -> str:
    """
    Builds the Elasticsearch query to be used when retrieving data.
    Args
    ----
      args: NamedTuple
        args.search_keys: Dict[str, Any]
            Search query sent by the customer as well as other variables that sets its
            context, such as region, favorite brand and so on.
        args.model_name: str
            Name of RankLib model saved on Elasticsearch
        args.index: str
            Index on Elasticsearch where to retrieve documents
        args.es_batch: int
            How many documents to retrieve
    Returns
    -------
      query: str
          String representation of final query
    """
    # it's expected that a ES query will be available at:
    # ./queries/{model_name}/es_query.json
    query = open(f'queries/{model_name}/es_query.json').read()
    query = json.loads(query.replace('{query}', search_keys['search_term']))
    # We just want to retrieve the id of the document to evaluate the ranks between
    # customers purchases and the retrieve list result
    query['_source'] = '_id'
    query['size'] = es_batch
    query['rescore']['window_size'] = 50  # Hardcoded to optimize first 50 skus
    query['rescore']['query']['rescore_query']['sltr']['params'] = search_keys
    query['rescore']['query']['rescore_query']['sltr']['model'] = model_name
def compute_rank(
    search_arr: List[str],
    purchase_arr: List[List[Dict[str, List[str]]]],
    rank_num: List[float],
    rank_den: List[float],
    es_client: Elasticsearch
) -> None:
    """
    Sends queries against Elasticsearch and compares results with what customers
    purchased. Computes the average rank position of where the purchased document falls
    within the retrieved items.
    Args
    ----
      search_arr: List[str]
          Searches made by customers as observed in validation data. We send those
          against Elasticsearch and compare results with purchased data
      purchase_arr: List[List[Dict[str, List[str]]]]
          List of documents that were purchased by customers
      rank_num: List[float]
          Numerator value of the rank equation. Defined as list to emulate a pointer
      rank_den: List[float]
      es_client: Elasticsearch
          Python Elasticsearch client
    """
    idx = 0
    if not search_arr:
        return

    request = os.linesep.join(search_arr)
    response = es_client.msearch(body=request, request_timeout=60)

    for hit in response['responses']:
        docs = [doc['_id'] for doc in hit['hits'].get('hits', [])]

        if not docs or len(docs) < 2:
            continue

        purchased_docs = [
            docs for purch in purchase_arr[idx] for docs in purch['purchased']
        ]
        ranks = np.where(np.in1d(docs, purchased_docs))[0]
        idx += 1

        if ranks.size == 0:
            continue

        rank_num[0] += ranks.sum() / (len(docs) - 1)
        rank_den[0] += ranks.size

    print('rank num: ', rank_num[0])
    print('rank den: ', rank_den[0])

5. Post RankLib Model

6. Final Testing

4. Hands-On Section

Developing AI Based Search Engine 16
Example of a GCP dashboard project. Image by author.
gcloud auth login
git clone pysearchml && cd pySearchML
#!/bin/bash
set -e

SUBSTITUTIONS=\
_COMPUTE_ZONE='us-central1-a',\
_CLUSTER_NAME='pysearchml',\
_VERSION='0.0.0'

./kubeflow/build/manage_service_account.sh

gcloud builds submit --no-source --config kubeflow/build/cloudbuild.yaml --substitutions $SUBSTITUTIONS --timeout=2h
./kubeflow/build/build.sh
Developing AI Based Search Engine 17
Steps executed on cloudbuild. Image by author.
Developing AI Based Search Engine 18
Kubernetes cluster deployed into GKE ready to run. Image by author.

 

gcloud container clusters get-credentials pysearchml
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80 1>/dev/null &
Developing AI Based Search Engine 19
Kubeflow Dashboard. Image by author.
Developing AI Based Search Engine 20
Full execution of the entire pipeline as defined in pySearchML. Image by author.
Developing AI Based Search Engine 21
Example of output as printed by the Katib component. Image by author.
Developing AI Based Search Engine 22
Example of output as printed by test component. Image by author.
kubectl apply -f kubeflow/disk-busybox.yaml
kubectl -n kubeflow exec -it nfs-busybox-(...) sh
kubectl port-forward service/front -n front 8088:8088 1>/dev/null &
Developing AI Based Search Engine 23
Front-end interface for running queries on the newly trained Elasticsearch. Image by author.

Developing AI Based Search Engine 24

5. Conclusion

This article has been published from the source link without modifications to the text. Only the headline has been changed.

Source link