You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
316 lines
13 KiB
316 lines
13 KiB
# Copyright 2020 The HuggingFace Inc. team.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
SentencePiece-based tokenization class for loading from sentencepiece.model files.
|
|
"""
|
|
|
|
import os
|
|
from shutil import copyfile
|
|
|
|
|
|
try:
|
|
import sentencepiece as spm
|
|
except ImportError:
|
|
spm = None
|
|
|
|
from .convert_slow_tokenizer import import_protobuf
|
|
from .tokenization_python import PreTrainedTokenizer
|
|
from .tokenization_utils_base import (
|
|
INIT_TOKENIZER_DOCSTRING,
|
|
AddedToken,
|
|
generate_merges,
|
|
)
|
|
from .utils import add_end_docstrings, logging, requires_backends
|
|
|
|
|
|
logger = logging.get_logger(__name__)
|
|
|
|
VOCAB_FILES_NAMES = {"vocab_file": "tokenizer.model"}
|
|
|
|
SPIECE_UNDERLINE = "▁"
|
|
|
|
|
|
@add_end_docstrings(INIT_TOKENIZER_DOCSTRING)
|
|
class SentencePieceBackend(PreTrainedTokenizer):
|
|
"""
|
|
Base class for SentencePiece-based tokenizers that load from sentencepiece.model files.
|
|
|
|
Inherits from [`~tokenization_utils.PreTrainedTokenizer`].
|
|
|
|
Handle all the shared methods for tokenization and special tokens as well as methods downloading/caching/loading
|
|
pretrained tokenizers as well as adding tokens to the vocabulary.
|
|
|
|
This class also contain the added tokens in a unified way on top of all tokenizers so we don't have to handle the
|
|
specific vocabulary augmentation methods of the various underlying dictionary structures (BPE, sentencepiece...).
|
|
"""
|
|
|
|
vocab_files_names = VOCAB_FILES_NAMES
|
|
|
|
def __init__(self, **kwargs):
|
|
# Ensure optional dependency is available before loading
|
|
requires_backends(self, "sentencepiece")
|
|
|
|
# Extract sentencepiece-specific parameters
|
|
self.vocab_file = kwargs.get("vocab_file")
|
|
self.legacy = kwargs.get("legacy", True)
|
|
self.sp_model_kwargs = kwargs.pop("sp_model_kwargs", {})
|
|
|
|
# Set backend to "sentencepiece" if not already set
|
|
if "backend" not in kwargs:
|
|
kwargs["backend"] = "sentencepiece"
|
|
|
|
# Load the SentencePiece model before calling parent __init__
|
|
# This is needed because parent __init__ may call methods that depend on sp_model
|
|
tokenizer = spm.SentencePieceProcessor(**self.sp_model_kwargs)
|
|
tokenizer.Load(self.vocab_file)
|
|
|
|
if not self.legacy:
|
|
model_pb2 = import_protobuf()
|
|
proto = model_pb2.ModelProto.FromString(tokenizer.serialized_model_proto())
|
|
if proto.normalizer_spec.add_dummy_prefix:
|
|
proto.normalizer_spec.add_dummy_prefix = False
|
|
tokenizer.LoadFromSerializedProto(proto.SerializeToString())
|
|
|
|
self.sp_model = tokenizer
|
|
|
|
# Initialize total_vocab_size before parent __init__ (which may call _add_tokens -> len(self))
|
|
self.total_vocab_size = self.sp_model.get_piece_size()
|
|
|
|
# Add sp_model_kwargs back to kwargs so it gets stored in init_kwargs
|
|
kwargs["sp_model_kwargs"] = self.sp_model_kwargs
|
|
|
|
# Call parent class __init__ (PreTrainedTokenizer)
|
|
# This handles tokens_trie, _added_tokens_decoder, _added_tokens_encoder,
|
|
# token_type_ids_pattern, special_tokens_pattern, and adds special tokens
|
|
super().__init__(**kwargs)
|
|
self._update_trie()
|
|
|
|
@property
|
|
def vocab_size(self) -> int:
|
|
"""Returns vocab size"""
|
|
return self.sp_model.get_piece_size()
|
|
|
|
def get_vocab(self):
|
|
"""Returns vocab as a dict"""
|
|
vocab = {self.convert_ids_to_tokens(i): i for i in range(self.vocab_size)}
|
|
vocab.update(self.added_tokens_encoder)
|
|
return vocab
|
|
|
|
def _add_tokens(self, new_tokens: list[str] | list[AddedToken], special_tokens: bool = False) -> int:
|
|
"""
|
|
Add a list of new tokens to the tokenizer class. If the new tokens are not in the vocabulary, they are added to
|
|
it with indices starting from length of the current vocabulary. Special tokens are sometimes already in the
|
|
vocab which is why they have to be handled specifically.
|
|
|
|
Args:
|
|
new_tokens (`list[str]`or `list[tokenizers.AddedToken]`):
|
|
Token(s) to add in vocabulary. A token is counted as added if it's not already in the vocabulary
|
|
(tested by checking if the tokenizer assign the index of the `unk_token` to them). If a token is part
|
|
of the vocabulary then we simply mark this token as an `AddedToken` which allows to control the
|
|
stripping and normalization of this token. This is NOT possible in `tokenizers`.
|
|
special_tokens (`bool`, *optional*, defaults to `False`):
|
|
Whether or not the tokens should be added as special tokens.
|
|
|
|
Returns:
|
|
`int`: The number of tokens actually added to the vocabulary.
|
|
|
|
Examples:
|
|
|
|
```python
|
|
# Let's see how to increase the vocabulary of Bert model and tokenizer
|
|
tokenizer = BertTokenizer.from_pretrained("google-bert/bert-base-uncased")
|
|
model = BertModel.from_pretrained("google-bert/bert-base-uncased")
|
|
|
|
num_added_toks = tokenizer.add_tokens(["new_tok1", "my_new-tok2"])
|
|
print("We have added", num_added_toks, "tokens")
|
|
# Note: resize_token_embeddings expects to receive the full size of the new vocabulary, i.e. the length of the tokenizer.
|
|
model.resize_token_embeddings(len(tokenizer))
|
|
```"""
|
|
if not new_tokens:
|
|
return 0
|
|
|
|
next_index = len(self) # total size (base + added)
|
|
num_added = 0
|
|
for token in new_tokens:
|
|
if not isinstance(token, (str, AddedToken)):
|
|
raise TypeError(f"Token {token} is not a string but a {type(token)}.")
|
|
if str(token) == "":
|
|
continue
|
|
if isinstance(token, str):
|
|
if token in self._added_tokens_encoder:
|
|
continue
|
|
is_special = token in self.all_special_tokens or special_tokens
|
|
token = AddedToken(token, rstrip=False, lstrip=False, normalized=not is_special, special=is_special)
|
|
elif special_tokens:
|
|
# doing token.special=True changes the normalization! will fix in rust
|
|
# this is important and the only reason why the AddedTokens in each class are normalized by default
|
|
token.__setstate__({"special": True, "normalized": token.normalized})
|
|
|
|
if token in self._added_tokens_decoder.values():
|
|
continue
|
|
if not token.special and token.normalized and getattr(self, "do_lower_case", False):
|
|
token.content = token.content.lower()
|
|
|
|
# Check if token already exists in the SentencePiece base vocab
|
|
tok_id = self.sp_model.piece_to_id(token.content)
|
|
in_base_vocab = (
|
|
tok_id < self.sp_model.get_piece_size() and self.sp_model.IdToPiece(tok_id) == token.content
|
|
)
|
|
|
|
if in_base_vocab:
|
|
token_index = tok_id
|
|
else:
|
|
token_index = next_index
|
|
next_index += 1
|
|
num_added += 1
|
|
|
|
if token.special and str(token) not in self.all_special_tokens:
|
|
self._extra_special_tokens.append(token)
|
|
# the setter automatically updates the reverse map
|
|
self._added_tokens_decoder[token_index] = token
|
|
self._added_tokens_encoder[token.content] = token_index
|
|
if self.verbose:
|
|
logger.info(f"Adding {token} to the vocabulary")
|
|
|
|
self._update_trie()
|
|
self._update_total_vocab_size()
|
|
return num_added
|
|
|
|
def _update_trie(self, unique_no_split_tokens: list[str] | None = None):
|
|
# Add all added tokens
|
|
for token in self._added_tokens_decoder.values():
|
|
if token.content not in self.tokens_trie._tokens:
|
|
self.tokens_trie.add(token.content)
|
|
# Also add all special tokens (even if they're in base vocab) so they get split during tokenization
|
|
for token in self.all_special_tokens:
|
|
if token not in self.tokens_trie._tokens:
|
|
self.tokens_trie.add(token)
|
|
# Add any additional no-split tokens
|
|
for token in unique_no_split_tokens or []:
|
|
if token not in self.tokens_trie._tokens:
|
|
self.tokens_trie.add(token)
|
|
|
|
def _tokenize(self, text, **kwargs):
|
|
"""
|
|
Returns a tokenized string.
|
|
|
|
We de-activated the `add_dummy_prefix` option, thus the sentencepiece internals will always strip any
|
|
SPIECE_UNDERLINE. For example: `self.sp_model.encode(f"{SPIECE_UNDERLINE}Hey", out_type = str)` will give
|
|
`['H', 'e', 'y']` instead of `['▁He', 'y']`. Thus we always encode `f"{unk_token}text"` and strip the
|
|
`unk_token`. Here is an example with `unk_token = "<unk>"` and `unk_token_length = 4`.
|
|
`self.tokenizer.sp_model.encode("<unk> Hey", out_type = str)[4:]`.
|
|
"""
|
|
if self.legacy or not text.startswith((SPIECE_UNDERLINE, " ")):
|
|
return self.sp_model.encode(text, out_type=str)
|
|
|
|
# 1. Encode string + prefix ex: "<unk> Hey"
|
|
tokens = self.sp_model.encode(self.unk_token + text, out_type=str)
|
|
# 2. Remove self.unk_token from ['<','unk','>', '▁Hey']
|
|
unk_token_length = len(self.sp_model.encode(str(self.unk_token)))
|
|
return tokens[unk_token_length:] if len(tokens) >= unk_token_length else tokens
|
|
|
|
def _convert_token_to_id(self, token):
|
|
"""Converts a token (str) to an id using the vocab."""
|
|
return self.sp_model.piece_to_id(token)
|
|
|
|
def _convert_id_to_token(self, index):
|
|
"""Converts an index (integer) in a token (str) using the vocab."""
|
|
token = self.sp_model.IdToPiece(index)
|
|
return token
|
|
|
|
def convert_tokens_to_string(self, tokens: list[str]) -> str:
|
|
"""Converts a sequence of tokens (string) in a single string."""
|
|
out_string = "".join(tokens).replace(SPIECE_UNDERLINE, " ").strip()
|
|
return out_string
|
|
|
|
def save_vocabulary(self, save_directory: str, filename_prefix: str | None = None) -> tuple[str]:
|
|
"""
|
|
Save the sentencepiece vocabulary (copy original file) to a directory.
|
|
|
|
Args:
|
|
save_directory (`str`):
|
|
The directory in which to save the vocabulary.
|
|
filename_prefix (`str`, *optional*):
|
|
An optional prefix to add to the named of the saved files.
|
|
|
|
Returns:
|
|
`tuple(str)`: Paths to the files saved.
|
|
"""
|
|
if not os.path.isdir(save_directory):
|
|
logger.error(f"Vocabulary path ({save_directory}) should be a directory")
|
|
return
|
|
out_vocab_file = os.path.join(
|
|
save_directory, (filename_prefix + "-" if filename_prefix else "") + self.vocab_files_names["vocab_file"]
|
|
)
|
|
|
|
if os.path.abspath(self.vocab_file) != os.path.abspath(out_vocab_file) and os.path.isfile(self.vocab_file):
|
|
copyfile(self.vocab_file, out_vocab_file)
|
|
elif not os.path.isfile(self.vocab_file):
|
|
with open(out_vocab_file, "wb") as fi:
|
|
content_spiece_model = self.sp_model.serialized_model_proto()
|
|
fi.write(content_spiece_model)
|
|
|
|
return (out_vocab_file,)
|
|
|
|
def _decode(
|
|
self,
|
|
token_ids: int | list[int],
|
|
skip_special_tokens: bool = False,
|
|
clean_up_tokenization_spaces: bool | None = None,
|
|
spaces_between_special_tokens: bool = False,
|
|
**kwargs,
|
|
) -> str:
|
|
"""
|
|
Decode token ids to string.
|
|
|
|
Uses the generic decode path from PreTrainedTokenizer which works for all vocabularies,
|
|
including custom vocabularies that override _convert_id_to_token.
|
|
"""
|
|
# Use parent class's generic decode method - it's simpler and works for all cases
|
|
return super()._decode(
|
|
token_ids=token_ids,
|
|
skip_special_tokens=skip_special_tokens,
|
|
clean_up_tokenization_spaces=clean_up_tokenization_spaces,
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
class SentencePieceExtractor:
|
|
"""
|
|
Extractor implementation for SentencePiece trained models. https://github.com/google/sentencepiece
|
|
"""
|
|
|
|
def __init__(self, model: str):
|
|
requires_backends(self, "sentencepiece")
|
|
from sentencepiece import SentencePieceProcessor
|
|
|
|
self.sp = SentencePieceProcessor()
|
|
self.sp.Load(model)
|
|
|
|
def extract(self, vocab_scores=None) -> tuple[dict[str, int], list[tuple[str, float]], list[tuple]]:
|
|
"""
|
|
By default will return vocab and merges with respect to their order, by sending `vocab_scores` we're going to
|
|
order the merges with respect to the piece scores instead.
|
|
"""
|
|
sp = self.sp
|
|
vocab_ids = {sp.id_to_piece(index): index for index in range(sp.GetPieceSize())}
|
|
|
|
vocab_scores_dict = {sp.id_to_piece(i): sp.get_score(i) for i in range(sp.GetPieceSize())}
|
|
|
|
merges = generate_merges(vocab_ids, vocab_scores_dict)
|
|
|
|
vocab_scores_list = [(sp.id_to_piece(i), sp.get_score(i)) for i in range(sp.GetPieceSize())]
|
|
|
|
return vocab_ids, vocab_scores_list, merges
|