Skip to content

reconciler.webutils

The web utilities module, for performing and parsing queries.

perform_query()

Make a post request to the reconciliation API

Parameters:

Name Type Description Default
query_string str

A string corresponding to the query JSON.

required
reconciliation_endpoint str

A url to the reconciliation endpoint.

required

Returns:

Name Type Description
dict

A dictionary (JSON) with the query results.

Raises:

Type Description
requests.HTTPError

The query returned an error, check if you mistyped an argument.

requests.ConnectionError

Couldn't connect to reconciliation client.

Source code in reconciler/webutils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@lru_cache(maxsize=None)
def perform_query(query_string, reconciliation_endpoint, max_tries=10):
    """Make a post request to the reconciliation API

    Args:
        query_string (str): A string corresponding to the query JSON.
        reconciliation_endpoint (str): A url to the reconciliation endpoint.

    Returns:
        dict: A dictionary (JSON) with the query results.

    Raises:
        requests.HTTPError: The query returned an error, check if you mistyped an argument.
        requests.ConnectionError: Couldn't connect to reconciliation client.

    """

    tries = 0
    while tries < max_tries:
        try:
            response = http.post(
                reconciliation_endpoint, data=json.loads(query_string)
            )
            # HTTP Service Unavailable
            if response.status_code == 503:
                tries += 1 
                continue
        except requests.ConnectionError:
            tries += 1
        else:
            query_result = response.json()
            if "status" in query_result and query_result["status"] == "error":
                raise requests.HTTPError(
                    "The query returned an error, check if you mistyped an argument."
                )
            else:
                return query_result
        logger.warn('Encountered an error trying again (tries=%s)', tries)
    if tries == max_tries:
        logger.warn('Too many errors (%s) while talking to reconcilitation server', max_tries)
        raise requests.ConnectionError("Couldn't connect to reconciliation server")

return_reconciled_raw()

Send reformatted dict for reconciliation

This is just a wrapper around the other utility functions. The only thing it actually does is convert the query dict to an appropriate JSON string.

Parameters:

Name Type Description Default
df_column Series

A pandas Series to reconcile.

required
type_id str

A string specifying the item type to reconcile against, in Wikidata this corresponds to the 'instance of' property of an item.

required
property_mapping dict

Property-column mapping of the items you want to reconcile against. For example, {"P17": df['country']} to reconcile against items that have the property country equals to the values in the column country. This is optional and defaults to None.

required
reconciliation_endpoint str

A url to the reconciliation endpoint.

required

Returns:

Name Type Description
tuple

A tuple containing the list of the original values sent to reconciliation and a dictionary (JSON) with the query results.

Source code in reconciler/webutils.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def return_reconciled_raw(
    df_column, type_id, property_mapping, reconciliation_endpoint
):
    """Send reformatted dict for reconciliation

    This is just a wrapper around the other utility functions. The
    only thing it actually does is convert the query dict to an
    appropriate JSON string.

    Args:
        df_column (Series): A pandas Series to reconcile.
        type_id (str): A string specifying the item type to reconcile against,
            in Wikidata this corresponds to the 'instance of' property of an item.
        property_mapping (dict): Property-column mapping of the items you want to
            reconcile against. For example, {"P17": df['country']} to reconcile
            against items that have the property country equals to the values
            in the column country. This is optional and defaults to None.
        reconciliation_endpoint (str): A url to the reconciliation endpoint.

    Returns:
        tuple: A tuple containing the list of the original values
            sent to reconciliation and a dictionary (JSON)
            with the query results.

    """

    input_keys, reformatted = get_query_dict(df_column, type_id, property_mapping)

    query_results = []
    chunked_dict = list(chunk_dictionary(reformatted))

    for chunk in tqdm(chunked_dict, position=0, leave=True):
        logger.debug('reconciling: %s', chunk)
        reconcilable_data = json.dumps({"queries": json.dumps(chunk)})
        query_result = perform_query(reconcilable_data, reconciliation_endpoint)
        query_results.append(query_result)

    merged_results = dict(ChainMap(*query_results))

    return input_keys, merged_results

parse_raw_results()

Parse JSON query result

Parameters:

Name Type Description Default
input_keys list

A list with the original input values that were used to reconcile.

required
response dict

A dict corresponding to the raw JSON response from the reconciliation API.

required

Returns:

Name Type Description
DataFrame

A Pandas DataFrame with all the results.

Source code in reconciler/webutils.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def parse_raw_results(input_keys, response):
    """
    Parse JSON query result

    Args:
        input_keys (list): A list with the original input values
            that were used to reconcile.
        response (dict): A dict corresponding to the raw JSON response
            from the reconciliation API.

    Returns:
        DataFrame: A Pandas DataFrame with all the results.
    """

    res_keys = sorted(response.keys(), key=int)

    dfs = []
    for idx, key in enumerate(res_keys):

        current_df = pd.json_normalize(response[key]["result"])

        if current_df.empty:
            current_df = pd.DataFrame(
                {
                    "id": [np.NaN],
                    "match": [False],
                }
            )
        else:
            try:
                current_df.drop(["features"], axis=1, inplace=True)
                current_df["type_id"] = [item[0]["id"] for item in current_df["type"]]
                current_df["type"] = [item[0]["name"] for item in current_df["type"]]
            except (IndexError, KeyError):
                pass

        current_df["input_value"] = input_keys[idx]
        dfs.append(current_df)

    concatenated = pd.concat(dfs)

    return concatenated