24 while (
i < buffer.
size()) {
25 const char c = buffer[
i];
42 approximate_chunk_size = std::max<int64_t>(approximate_chunk_size, 1);
45 while (start < buffer.
size()) {
46 int64_t end = std::min(start + approximate_chunk_size, buffer.
size());
66 r_data_offsets.
clear();
67 r_data_fields.clear();
71 while (start < buffer.
size()) {
72 const std::optional<int64_t> next_record_start = parse_record_fields(
79 if (!next_record_start.has_value()) {
83 if (r_data_fields.size() > r_data_offsets.
last()) {
84 r_data_offsets.
append(r_data_fields.size());
86 start = *next_record_start;
101 const std::optional<int64_t> first_data_record_start = parse_record_fields(
103 if (!first_data_record_start.has_value()) {
108 process_header(
CsvRecord(header_fields));
114 data_buffer,
options.chunk_size_bytes);
121 std::atomic<bool> found_malformed_chunk =
false;
129 TLS &tls = all_tls.local();
130 for (const int64_t i : range) {
131 if (found_malformed_chunk.load(std::memory_order_relaxed)) {
135 const Span<char> chunk_buffer = data_buffer_chunks[i];
136 const std::optional<CsvRecords> records = parse_records(
137 chunk_buffer, options, tls.data_offsets, tls.data_fields);
138 if (!records.has_value()) {
139 found_malformed_chunk.store(true, std::memory_order_relaxed);
142 chunk_results[i] = process_records(*records);
149 if (found_malformed_chunk) {
150 chunk_results.clear();
151 TLS &tls = all_tls.local();
153 data_buffer,
options, tls.data_offsets, tls.data_fields);
154 if (!records.has_value()) {
157 chunk_results.append(process_records(*records));
161 Vector<Any<>> results;
162 for (std::optional<Any<>> &
result : chunk_results) {
181 while (
i <
str.size()) {
182 const char c =
str[
i];
183 if (
options.quote_escape_chars.contains(c)) {
186 unescaped_str[escaped_size++] =
options.quote;
191 unescaped_str[escaped_size++] = c;
201 const char delimiter,
208 const auto handle_potentially_trailing_delimiter = [&](
const int64_t i) {
209 if (
i <= buffer.
size()) {
210 if (
i < buffer.
size()) {
211 if (
ELEM(buffer[
i],
'\n',
'\r')) {
222 while (
i < buffer.
size()) {
223 const char c = buffer[
i];
231 if (c == delimiter) {
234 handle_potentially_trailing_delimiter(
i);
240 buffer,
i, quote, quote_escape_chars);
241 if (!end_of_field.has_value()) {
246 while (
i < buffer.
size()) {
247 const char inner_c = buffer[
i];
248 if (inner_c == quote) {
252 if (inner_c == delimiter) {
254 handle_potentially_trailing_delimiter(
i);
257 if (
ELEM(inner_c,
'\n',
'\r')) {
267 while (
i < buffer.
size()) {
268 const char inner_c = buffer[
i];
269 if (inner_c == delimiter) {
271 handle_potentially_trailing_delimiter(
i);
274 if (
ELEM(inner_c,
'\n',
'\r')) {
281 return buffer.
size();
286 const char delimiter)
289 while (
i < buffer.
size()) {
290 const char c = buffer[
i];
291 if (
ELEM(c, delimiter,
'\n',
'\r')) {
296 return buffer.
size();
305 while (
i < buffer.
size()) {
306 const char c = buffer[
i];
308 if (
i + 1 < buffer.
size() && buffer[
i + 1] == quote) {
#define BLI_assert_unreachable()
void append(const T &value)
static constexpr IndexRange from_begin_end(const int64_t begin, const int64_t end)
MutableSpan< T > allocate_array(int64_t size)
constexpr MutableSpan take_front(const int64_t n) const
constexpr Span drop_front(int64_t n) const
constexpr Span slice(int64_t start, int64_t size) const
constexpr int64_t size() const
constexpr bool contains(const T &value) const
static constexpr int64_t not_found
void append(const T &value)
const T & last(const int64_t n=0) const
IndexRange index_range() const
CCL_NAMESPACE_BEGIN struct Options options
int64_t find_end_of_simple_field(Span< char > buffer, int64_t start, char delimiter)
std::optional< int64_t > parse_record_fields(const Span< char > buffer, const int64_t start, const char delimiter, const char quote, const Span< char > quote_escape_chars, Vector< Span< char > > &r_fields)
std::optional< int64_t > find_end_of_quoted_field(Span< char > buffer, int64_t start, char quote, Span< char > escape_chars)
std::optional< Vector< Any<> > > parse_csv_in_chunks(const Span< char > buffer, const CsvParseOptions &options, FunctionRef< void(const CsvRecord &record)> process_header, FunctionRef< Any<>(const CsvRecords &records)> process_records)
static int64_t guess_next_record_start(const Span< char > buffer, const int64_t start)
StringRef unescape_field(const StringRef str, const CsvParseOptions &options, LinearAllocator<> &allocator)
static Vector< Span< char > > split_into_aligned_chunks(const Span< char > buffer, int64_t approximate_chunk_size)
static std::optional< CsvRecords > parse_records(const Span< char > buffer, const CsvParseOptions &options, Vector< int64_t > &r_data_offsets, Vector< Span< char > > &r_data_fields)
void parallel_for(const IndexRange range, const int64_t grain_size, const Function &function, const TaskSizeHints &size_hints=detail::TaskSizeHints_Static(1))