304 Params *params_ =
nullptr;
305 const Context *context_ =
nullptr;
310 std::atomic<TaskPool *> task_pool_ =
nullptr;
311#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
312 std::thread::id current_main_thread_;
315 struct ThreadLocalStorage {
321 std::optional<destruct_ptr<LocalUserData>> local_user_data;
323 std::unique_ptr<threading::EnumerableThreadSpecific<ThreadLocalStorage>> thread_locals_;
328 bool is_first_execution_ =
true;
330 friend GraphExecutorLFParams;
347 BLI_assert(self_.graph_.node_indices_are_valid());
356 for (const int node_index : range) {
357 const Node &node = *self_.graph_.nodes()[node_index];
358 NodeState &node_state = *node_states_[node_index];
359 this->destruct_node_state(node, node_state);
371#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
372 current_main_thread_ = std::this_thread::get_id();
374 const auto deferred_func = [&]() {
378 is_first_execution_ =
false;
379#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
380 current_main_thread_ = {};
388 if (is_first_execution_) {
390 char *buffer =
static_cast<char *
>(
391 local_data.allocator->
allocate(self_.init_buffer_info_.total_size,
alignof(
void *)));
392 this->initialize_node_states(buffer);
395 reinterpret_cast<std::atomic<uint8_t> *
>(
396 buffer + self_.init_buffer_info_.loaded_inputs_array_offset),
397 self_.graph_inputs_.size()};
399 memset(
static_cast<void *
>(loaded_inputs_.data()), 0, loaded_inputs_.size() *
sizeof(
bool));
401 this->set_always_unused_graph_inputs();
402 this->set_defaulted_graph_outputs(local_data);
406 if (self_.side_effect_provider_ !=
nullptr) {
407 side_effect_nodes = self_.side_effect_provider_->get_nodes_with_side_effects(context);
409 BLI_assert(self_.graph_.nodes().contains(node));
410 const int node_index = node->index_in_graph();
411 NodeState &node_state = *node_states_[node_index];
416 this->initialize_static_value_usages(side_effect_nodes);
417 this->schedule_side_effect_nodes(side_effect_nodes, current_task, local_data);
420 this->schedule_for_new_output_usages(current_task, local_data);
421 this->forward_newly_provided_inputs(current_task, local_data);
423 this->run_task(current_task, local_data);
431 void initialize_node_states(
char *buffer)
435 reinterpret_cast<NodeState **
>(buffer + self_.init_buffer_info_.node_states_array_offset),
439 for (const int i : range) {
440 const Node &node = *nodes[i];
441 char *memory = buffer + self_.init_buffer_info_.node_states_offsets[i];
444 NodeState *node_state = reinterpret_cast<NodeState *>(memory);
445 memory += sizeof(NodeState);
446 new (node_state) NodeState();
449 const int num_inputs = node.inputs().size();
450 const int num_outputs = node.outputs().size();
451 node_state->inputs = reinterpret_cast<InputState *>(memory);
452 memory += sizeof(InputState) * num_inputs;
453 node_state->outputs = reinterpret_cast<OutputState *>(memory);
455 default_construct_n(node_state->inputs, num_inputs);
456 default_construct_n(node_state->outputs, num_outputs);
458 node_states_[i] = node_state;
463 void destruct_node_state(
const Node &node, NodeState &node_state)
465 if (node.is_function()) {
466 const LazyFunction &fn =
static_cast<const FunctionNode &
>(node).function();
467 if (node_state.storage !=
nullptr) {
468 fn.destruct_storage(node_state.storage);
471 for (
const int i : node.inputs().index_range()) {
472 InputState &input_state = node_state.inputs[
i];
473 const InputSocket &input_socket = node.input(
i);
474 this->destruct_input_value_if_exists(input_state, input_socket.type());
476 std::destroy_at(&node_state);
482 void schedule_for_new_output_usages(CurrentTask ¤t_task,
const LocalData &local_data)
484 for (
const int graph_output_index : self_.graph_outputs_.index_range()) {
485 if (params_->output_was_set(graph_output_index)) {
488 const ValueUsage output_usage = params_->get_output_usage(graph_output_index);
489 if (output_usage == ValueUsage::Maybe) {
492 const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
493 const Node &node = socket.node();
494 NodeState &node_state = *node_states_[node.index_in_graph()];
495 this->with_locked_node(
496 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
497 if (output_usage == ValueUsage::Used) {
498 this->set_input_required(locked_node, socket);
501 this->set_input_unused(locked_node, socket);
507 void set_defaulted_graph_outputs(
const LocalData &local_data)
509 for (
const int graph_output_index : self_.graph_outputs_.index_range()) {
510 const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
511 if (socket.origin() !=
nullptr) {
514 const CPPType &type = socket.type();
518 if (self_.logger_ !=
nullptr) {
519 const Context
context{context_->storage, context_->user_data, local_data.local_user_data};
520 self_.logger_->log_socket_value(socket, {type, default_value},
context);
523 void *output_ptr = params_->get_output_data_ptr(graph_output_index);
525 params_->output_set(graph_output_index);
529 void set_always_unused_graph_inputs()
531 for (
const int i : self_.graph_inputs_.index_range()) {
532 const OutputSocket &socket = *self_.graph_inputs_[
i];
533 const Node &node = socket.node();
534 const NodeState &node_state = *node_states_[node.index_in_graph()];
535 const OutputState &output_state = node_state.outputs[socket.index()];
536 if (output_state.usage == ValueUsage::Unused) {
537 params_->set_input_unused(
i);
558 for (
const InputSocket *socket : self_.graph_outputs_) {
559 const Node &node = socket->node();
560 const int node_index = node.index_in_graph();
561 if (!reachable_node_flags[node_index]) {
562 reachable_node_flags[node_index] =
true;
563 reachable_nodes_to_check.
push(&node);
568 for (
const FunctionNode *node : side_effect_nodes) {
569 const int node_index = node->index_in_graph();
570 reachable_node_flags[node_index] =
true;
571 reachable_nodes_to_check.
push(node);
575 while (!reachable_nodes_to_check.
is_empty()) {
576 const Node &node = *reachable_nodes_to_check.
pop();
577 for (
const InputSocket *input_socket : node.inputs()) {
578 const OutputSocket *origin_socket = input_socket->origin();
579 if (origin_socket !=
nullptr) {
580 const Node &origin_node = origin_socket->node();
581 const int origin_node_index = origin_node.index_in_graph();
582 if (!reachable_node_flags[origin_node_index]) {
583 reachable_node_flags[origin_node_index] =
true;
584 reachable_nodes_to_check.
push(&origin_node);
590 for (
const int node_index : reachable_node_flags.index_range()) {
591 const Node &node = *all_nodes[node_index];
592 NodeState &node_state = *node_states_[node_index];
593 const bool node_is_reachable = reachable_node_flags[node_index];
594 if (node_is_reachable) {
595 for (
const int output_index : node.outputs().index_range()) {
596 const OutputSocket &output_socket = node.output(output_index);
597 OutputState &output_state = node_state.outputs[output_index];
599 for (
const InputSocket *target_socket : output_socket.targets()) {
600 const Node &target_node = target_socket->node();
601 const bool target_is_reachable = reachable_node_flags[target_node.index_in_graph()];
603 if (target_is_reachable) {
607 output_state.potential_target_sockets = use_count;
608 if (use_count == 0) {
609 output_state.usage = ValueUsage::Unused;
615 for (
const int input_index : node.inputs().index_range()) {
616 node_state.inputs[input_index].usage = ValueUsage::Unused;
623 CurrentTask ¤t_task,
624 const LocalData &local_data)
626 for (
const FunctionNode *node : side_effect_nodes) {
627 NodeState &node_state = *node_states_[node->index_in_graph()];
628 this->with_locked_node(
629 *node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
630 this->schedule_node(locked_node, current_task,
false);
635 void forward_newly_provided_inputs(CurrentTask ¤t_task,
const LocalData &local_data)
637 for (
const int graph_input_index : self_.graph_inputs_.index_range()) {
638 std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
639 if (was_loaded.load()) {
642 void *input_data = params_->try_get_input_data_ptr(graph_input_index);
643 if (input_data ==
nullptr) {
646 if (was_loaded.fetch_or(1)) {
650 this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
654 void forward_newly_provided_input(CurrentTask ¤t_task,
655 const LocalData &local_data,
656 const int graph_input_index,
659 const OutputSocket &socket = *self_.graph_inputs_[graph_input_index];
660 const CPPType &type = socket.type();
661 void *buffer = local_data.allocator->allocate(type);
663 this->forward_value_to_linked_inputs(socket, {type, buffer}, current_task, local_data);
666 void notify_output_required(
const OutputSocket &socket,
667 CurrentTask ¤t_task,
668 const LocalData &local_data)
670 const Node &node = socket.node();
671 const int index_in_node = socket.index();
672 NodeState &node_state = *node_states_[node.index_in_graph()];
673 OutputState &output_state = node_state.outputs[index_in_node];
677 if (node.is_interface()) {
678 const int graph_input_index = self_.graph_input_index_by_socket_index_[socket.index()];
679 std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
680 if (was_loaded.load()) {
683 void *input_data = params_->try_get_input_data_ptr_or_request(graph_input_index);
684 if (input_data ==
nullptr) {
687 if (was_loaded.fetch_or(1)) {
691 this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
696 this->with_locked_node(
697 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
698 if (output_state.usage == ValueUsage::Used) {
701 output_state.usage = ValueUsage::Used;
702 this->schedule_node(locked_node, current_task,
false);
706 void notify_output_unused(
const OutputSocket &socket,
707 CurrentTask ¤t_task,
708 const LocalData &local_data)
710 const Node &node = socket.node();
711 const int index_in_node = socket.index();
712 NodeState &node_state = *node_states_[node.index_in_graph()];
713 OutputState &output_state = node_state.outputs[index_in_node];
715 this->with_locked_node(
716 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
717 output_state.potential_target_sockets -= 1;
718 if (output_state.potential_target_sockets == 0) {
719 BLI_assert(output_state.usage != ValueUsage::Unused);
720 if (output_state.usage == ValueUsage::Maybe) {
721 output_state.usage = ValueUsage::Unused;
722 if (node.is_interface()) {
723 const int graph_input_index =
724 self_.graph_input_index_by_socket_index_[socket.index()];
725 params_->set_input_unused(graph_input_index);
730 this->schedule_node(locked_node, current_task,
true);
737 void schedule_node(LockedNode &locked_node, CurrentTask ¤t_task,
const bool is_priority)
740 switch (locked_node.node_state.schedule_state) {
741 case NodeScheduleState::NotScheduled: {
742 locked_node.node_state.schedule_state = NodeScheduleState::Scheduled;
743 const FunctionNode &node =
static_cast<const FunctionNode &
>(locked_node.node);
744 if (this->use_multi_threading()) {
745 std::lock_guard
lock{current_task.mutex};
746 current_task.scheduled_nodes.schedule(node, is_priority);
749 current_task.scheduled_nodes.schedule(node, is_priority);
751 current_task.has_scheduled_nodes.store(
true, std::memory_order_relaxed);
754 case NodeScheduleState::Scheduled: {
757 case NodeScheduleState::Running: {
758 locked_node.node_state.schedule_state = NodeScheduleState::RunningAndRescheduled;
761 case NodeScheduleState::RunningAndRescheduled: {
767 void with_locked_node(
const Node &node,
768 NodeState &node_state,
769 CurrentTask ¤t_task,
770 const LocalData &local_data,
773 BLI_assert(&node_state == node_states_[node.index_in_graph()]);
775 LockedNode locked_node{node, node_state};
776 if (this->use_multi_threading()) {
777 std::lock_guard
lock{node_state.mutex};
778 threading::isolate_task([&]() { f(locked_node); });
784 this->send_output_required_notifications(
785 locked_node.delayed_required_outputs, current_task, local_data);
786 this->send_output_unused_notifications(
787 locked_node.delayed_unused_outputs, current_task, local_data);
791 CurrentTask ¤t_task,
792 const LocalData &local_data)
794 for (
const OutputSocket *socket : sockets) {
795 this->notify_output_required(*socket, current_task, local_data);
800 CurrentTask ¤t_task,
801 const LocalData &local_data)
803 for (
const OutputSocket *socket : sockets) {
804 this->notify_output_unused(*socket, current_task, local_data);
808 void run_task(CurrentTask ¤t_task,
const LocalData &local_data)
810 while (
const FunctionNode *node = current_task.scheduled_nodes.pop_next_node()) {
811 if (current_task.scheduled_nodes.is_empty()) {
812 current_task.has_scheduled_nodes.store(
false, std::memory_order_relaxed);
814 this->run_node_task(*node, current_task, local_data);
818 if (current_task.scheduled_nodes.nodes_num() > 128) {
819 if (this->try_enable_multi_threading()) {
820 std::unique_ptr<ScheduledNodes> split_nodes = std::make_unique<ScheduledNodes>();
821 current_task.scheduled_nodes.split_into(*split_nodes);
822 this->push_to_task_pool(std::move(split_nodes));
828 void run_node_task(
const FunctionNode &node,
829 CurrentTask ¤t_task,
830 const LocalData &local_data)
832 NodeState &node_state = *node_states_[node.index_in_graph()];
833 LinearAllocator<> &allocator = *local_data.allocator;
834 Context local_context{context_->storage, context_->user_data, local_data.local_user_data};
835 const LazyFunction &fn = node.function();
837 bool node_needs_execution =
false;
838 this->with_locked_node(
839 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
840 BLI_assert(node_state.schedule_state == NodeScheduleState::Scheduled);
841 node_state.schedule_state = NodeScheduleState::Running;
843 if (node_state.node_has_finished) {
847 bool required_uncomputed_output_exists =
false;
848 for (
const int output_index : node.outputs().index_range()) {
849 OutputState &output_state = node_state.outputs[output_index];
850 output_state.usage_for_execution = output_state.usage;
851 if (output_state.usage == ValueUsage::Used && !output_state.has_been_computed) {
852 required_uncomputed_output_exists =
true;
855 if (!required_uncomputed_output_exists && !node_state.has_side_effects) {
859 if (!node_state.always_used_inputs_requested) {
862 for (
const int input_index : fn_inputs.
index_range()) {
863 const Input &fn_input = fn_inputs[input_index];
864 if (fn_input.usage == ValueUsage::Used) {
865 const InputSocket &input_socket = node.input(input_index);
866 if (input_socket.origin() !=
nullptr) {
867 this->set_input_required(locked_node, input_socket);
872 node_state.always_used_inputs_requested =
true;
875 for (
const int input_index : node.inputs().index_range()) {
876 InputState &input_state = node_state.inputs[input_index];
877 if (input_state.was_ready_for_execution) {
880 if (input_state.value !=
nullptr) {
881 input_state.was_ready_for_execution =
true;
884 if (!fn.allow_missing_requested_inputs()) {
885 if (input_state.usage == ValueUsage::Used) {
891 node_needs_execution =
true;
894 if (node_needs_execution) {
895 if (!node_state.storage_and_defaults_initialized) {
897 node_state.storage = fn.init_storage(allocator);
900 for (
const int input_index : node.inputs().index_range()) {
901 const InputSocket &input_socket = node.input(input_index);
902 if (input_socket.origin() !=
nullptr) {
905 InputState &input_state = node_state.inputs[input_index];
906 const CPPType &type = input_socket.type();
909 if (self_.logger_ !=
nullptr) {
910 self_.logger_->log_socket_value(input_socket, {type, default_value}, local_context);
913 input_state.value = allocator.allocate(type);
915 input_state.was_ready_for_execution =
true;
918 node_state.storage_and_defaults_initialized =
true;
924 this->execute_node(node, node_state, current_task, local_data);
927 this->with_locked_node(
928 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
930 if (node_needs_execution) {
931 this->assert_expected_outputs_have_been_computed(locked_node, local_data);
934 this->finish_node_if_possible(locked_node);
935 const bool reschedule_requested = node_state.schedule_state ==
936 NodeScheduleState::RunningAndRescheduled;
937 node_state.schedule_state = NodeScheduleState::NotScheduled;
938 if (reschedule_requested && !node_state.node_has_finished) {
939 this->schedule_node(locked_node, current_task,
false);
944 void assert_expected_outputs_have_been_computed(LockedNode &locked_node,
945 const LocalData &local_data)
947 const FunctionNode &node =
static_cast<const FunctionNode &
>(locked_node.node);
948 const NodeState &node_state = locked_node.node_state;
950 if (node_state.missing_required_inputs > 0) {
953 if (node_state.schedule_state == NodeScheduleState::RunningAndRescheduled) {
957 for (
const int i : node.outputs().index_range()) {
958 const OutputState &output_state = node_state.outputs[
i];
959 if (output_state.usage_for_execution == ValueUsage::Used) {
960 if (!output_state.has_been_computed) {
961 missing_outputs.
append(&node.output(
i));
966 if (self_.logger_ !=
nullptr) {
967 const Context
context{context_->storage, context_->user_data, local_data.local_user_data};
968 self_.logger_->dump_when_outputs_are_missing(node, missing_outputs, context);
974 void finish_node_if_possible(LockedNode &locked_node)
976 const Node &node = locked_node.node;
977 NodeState &node_state = locked_node.node_state;
979 if (node_state.node_has_finished) {
984 for (
const int output_index : node.outputs().index_range()) {
985 const OutputState &output_state = node_state.outputs[output_index];
986 if (output_state.usage != ValueUsage::Unused && !output_state.has_been_computed) {
991 for (
const int input_index : node.inputs().index_range()) {
992 const InputState &input_state = node_state.inputs[input_index];
993 if (input_state.usage == ValueUsage::Used && !input_state.was_ready_for_execution) {
998 node_state.node_has_finished =
true;
1000 for (
const int input_index : node.inputs().index_range()) {
1001 const InputSocket &input_socket = node.input(input_index);
1002 InputState &input_state = node_state.inputs[input_index];
1003 if (input_state.usage == ValueUsage::Maybe) {
1004 this->set_input_unused(locked_node, input_socket);
1006 else if (input_state.usage == ValueUsage::Used) {
1007 this->destruct_input_value_if_exists(input_state, input_socket.type());
1011 if (node_state.storage !=
nullptr) {
1012 if (node.is_function()) {
1013 const FunctionNode &fn_node =
static_cast<const FunctionNode &
>(node);
1014 fn_node.function().destruct_storage(node_state.storage);
1016 node_state.storage =
nullptr;
1020 void destruct_input_value_if_exists(InputState &input_state,
const CPPType &type)
1022 if (input_state.value !=
nullptr) {
1024 input_state.value =
nullptr;
1028 void execute_node(
const FunctionNode &node,
1029 NodeState &node_state,
1030 CurrentTask ¤t_task,
1031 const LocalData &local_data);
1033 void set_input_unused_during_execution(
const Node &node,
1034 NodeState &node_state,
1035 const int input_index,
1036 CurrentTask ¤t_task,
1037 const LocalData &local_data)
1039 const InputSocket &input_socket = node.input(input_index);
1040 this->with_locked_node(
1041 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1042 this->set_input_unused(locked_node, input_socket);
1046 void set_input_unused(LockedNode &locked_node,
const InputSocket &input_socket)
1048 NodeState &node_state = locked_node.node_state;
1049 const int input_index = input_socket.index();
1050 InputState &input_state = node_state.inputs[input_index];
1052 BLI_assert(input_state.usage != ValueUsage::Used);
1053 if (input_state.usage == ValueUsage::Unused) {
1056 input_state.usage = ValueUsage::Unused;
1058 this->destruct_input_value_if_exists(input_state, input_socket.type());
1059 if (input_state.was_ready_for_execution) {
1062 const OutputSocket *origin = input_socket.origin();
1063 if (origin !=
nullptr) {
1064 locked_node.delayed_unused_outputs.append(origin);
1068 void *set_input_required_during_execution(
const Node &node,
1069 NodeState &node_state,
1070 const int input_index,
1071 CurrentTask ¤t_task,
1072 const LocalData &local_data)
1074 const InputSocket &input_socket = node.input(input_index);
1076 this->with_locked_node(
1077 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1078 result = this->set_input_required(locked_node, input_socket);
1083 void *set_input_required(LockedNode &locked_node,
const InputSocket &input_socket)
1085 BLI_assert(&locked_node.node == &input_socket.node());
1086 NodeState &node_state = locked_node.node_state;
1087 const int input_index = input_socket.index();
1088 InputState &input_state = node_state.inputs[input_index];
1090 BLI_assert(input_state.usage != ValueUsage::Unused);
1092 if (input_state.value !=
nullptr) {
1093 input_state.was_ready_for_execution =
true;
1094 return input_state.value;
1096 if (input_state.usage == ValueUsage::Used) {
1099 input_state.usage = ValueUsage::Used;
1100 node_state.missing_required_inputs += 1;
1102 const OutputSocket *origin_socket = input_socket.origin();
1105 locked_node.delayed_required_outputs.append(origin_socket);
1109 void forward_value_to_linked_inputs(
const OutputSocket &from_socket,
1110 GMutablePointer value_to_forward,
1111 CurrentTask ¤t_task,
1112 const LocalData &local_data)
1114 BLI_assert(value_to_forward.get() !=
nullptr);
1115 const CPPType &type = *value_to_forward.type();
1116 const Context local_context{
1117 context_->storage, context_->user_data, local_data.local_user_data};
1119 if (self_.logger_ !=
nullptr) {
1120 self_.logger_->log_socket_value(from_socket, value_to_forward, local_context);
1124 for (
const InputSocket *target_socket : targets) {
1125 const Node &target_node = target_socket->node();
1126 NodeState &node_state = *node_states_[target_node.index_in_graph()];
1127 const int input_index = target_socket->index();
1128 InputState &input_state = node_state.inputs[input_index];
1129 const bool is_last_target = target_socket == targets.last();
1131 if (input_state.value !=
nullptr) {
1132 if (self_.logger_ !=
nullptr) {
1133 self_.logger_->dump_when_input_is_set_twice(*target_socket, from_socket, local_context);
1138 BLI_assert(!input_state.was_ready_for_execution);
1140 BLI_assert(target_socket->origin() == &from_socket);
1142 if (self_.logger_ !=
nullptr) {
1143 self_.logger_->log_socket_value(*target_socket, value_to_forward, local_context);
1145 if (target_node.is_interface()) {
1147 const int graph_output_index =
1148 self_.graph_output_index_by_socket_index_[target_socket->index()];
1149 if (graph_output_index != -1 &&
1150 params_->get_output_usage(graph_output_index) != ValueUsage::Unused)
1152 void *dst_buffer = params_->get_output_data_ptr(graph_output_index);
1153 if (is_last_target) {
1159 params_->output_set(graph_output_index);
1163 this->with_locked_node(
1164 target_node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1165 if (input_state.usage == ValueUsage::Unused) {
1168 if (is_last_target) {
1170 this->forward_value_to_input(
1171 locked_node, input_state, value_to_forward, current_task);
1172 value_to_forward = {};
1175 void *buffer = local_data.allocator->allocate(type);
1177 this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
1181 if (value_to_forward.get() !=
nullptr) {
1182 value_to_forward.destruct();
1186 void forward_value_to_input(LockedNode &locked_node,
1187 InputState &input_state,
1188 GMutablePointer value,
1189 CurrentTask ¤t_task)
1191 NodeState &node_state = locked_node.node_state;
1194 BLI_assert(!input_state.was_ready_for_execution);
1195 input_state.value = value.get();
1197 if (input_state.usage == ValueUsage::Used) {
1198 node_state.missing_required_inputs -= 1;
1199 if (node_state.missing_required_inputs == 0 ||
1200 (locked_node.node.is_function() &&
static_cast<const FunctionNode &
>(locked_node.node)
1202 .allow_missing_requested_inputs()))
1204 this->schedule_node(locked_node, current_task,
false);
1209 bool use_multi_threading()
const
1211 return task_pool_.load() !=
nullptr;
1214 bool try_enable_multi_threading()
1222 if (this->use_multi_threading()) {
1225#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
1228 if (current_main_thread_ != std::this_thread::get_id()) {
1233 if (!params_->try_enable_multi_threading()) {
1240 this->ensure_thread_locals();
1245 void ensure_thread_locals()
1247#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
1248 if (current_main_thread_ != std::this_thread::get_id()) {
1252 if (!thread_locals_) {
1253 thread_locals_ = std::make_unique<threading::EnumerableThreadSpecific<ThreadLocalStorage>>();
1260 void push_all_scheduled_nodes_to_task_pool(CurrentTask ¤t_task)
1263 std::unique_ptr<ScheduledNodes> scheduled_nodes = std::make_unique<ScheduledNodes>();
1265 std::lock_guard
lock{current_task.mutex};
1266 if (current_task.scheduled_nodes.is_empty()) {
1269 *scheduled_nodes = std::move(current_task.scheduled_nodes);
1270 current_task.has_scheduled_nodes.store(
false, std::memory_order_relaxed);
1272 this->push_to_task_pool(std::move(scheduled_nodes));
1275 void push_to_task_pool(std::unique_ptr<ScheduledNodes> scheduled_nodes)
1282 Executor &executor = *static_cast<Executor *>(BLI_task_pool_user_data(pool));
1283 ScheduledNodes &scheduled_nodes = *static_cast<ScheduledNodes *>(data);
1284 CurrentTask new_current_task;
1285 new_current_task.scheduled_nodes = std::move(scheduled_nodes);
1286 new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
1287 const LocalData local_data = executor.get_local_data();
1288 executor.run_task(new_current_task, local_data);
1290 scheduled_nodes.release(),
1292 [](
TaskPool * ,
void *
data) { delete static_cast<ScheduledNodes *>(data); });
1297 if (!this->use_multi_threading()) {
1298 return {&main_allocator_, context_->local_user_data};
1300 ThreadLocalStorage &local_storage = thread_locals_->local();
1301 if (!local_storage.local_user_data.has_value()) {
1302 local_storage.local_user_data = context_->user_data->get_local(local_storage.allocator);
1304 return {&local_storage.allocator, local_storage.local_user_data->get()};