305 Params *params_ =
nullptr;
306 const Context *context_ =
nullptr;
311 std::atomic<TaskPool *> task_pool_ =
nullptr;
312#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
313 std::thread::id current_main_thread_;
316 struct ThreadLocalStorage {
322 std::optional<destruct_ptr<LocalUserData>> local_user_data;
324 std::unique_ptr<threading::EnumerableThreadSpecific<ThreadLocalStorage>> thread_locals_;
329 bool is_first_execution_ =
true;
357 for (const int node_index : range) {
358 const Node &node = *self_.graph_.nodes()[node_index];
359 NodeState &node_state = *node_states_[node_index];
360 this->destruct_node_state(node, node_state);
372#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
373 current_main_thread_ = std::this_thread::get_id();
375 const auto deferred_func = [&]() {
379 is_first_execution_ =
false;
380#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
381 current_main_thread_ = {};
389 if (is_first_execution_) {
391 char *buffer =
static_cast<char *
>(
392 local_data.allocator->
allocate(self_.init_buffer_info_.total_size,
alignof(
void *)));
393 this->initialize_node_states(buffer);
396 reinterpret_cast<std::atomic<uint8_t> *
>(
397 buffer + self_.init_buffer_info_.loaded_inputs_array_offset),
398 self_.graph_inputs_.
size()};
400 memset(
static_cast<void *
>(loaded_inputs_.data()), 0, loaded_inputs_.size() *
sizeof(
bool));
402 this->set_always_unused_graph_inputs();
403 this->set_defaulted_graph_outputs(local_data);
407 if (self_.side_effect_provider_ !=
nullptr) {
408 side_effect_nodes = self_.side_effect_provider_->get_nodes_with_side_effects(context);
410 BLI_assert(self_.graph_.nodes().contains(node));
411 const int node_index = node->index_in_graph();
412 NodeState &node_state = *node_states_[node_index];
417 this->initialize_static_value_usages(side_effect_nodes);
418 this->schedule_side_effect_nodes(side_effect_nodes, current_task, local_data);
421 this->schedule_for_new_output_usages(current_task, local_data);
422 this->forward_newly_provided_inputs(current_task, local_data);
424 this->run_task(current_task, local_data);
432 void initialize_node_states(
char *buffer)
436 reinterpret_cast<NodeState **
>(buffer + self_.init_buffer_info_.node_states_array_offset),
440 for (const int i : range) {
441 const Node &node = *nodes[i];
442 char *memory = buffer + self_.init_buffer_info_.node_states_offsets[i];
445 NodeState *node_state = reinterpret_cast<NodeState *>(memory);
446 memory += sizeof(NodeState);
447 new (node_state) NodeState();
450 const int num_inputs = node.inputs().size();
451 const int num_outputs = node.outputs().size();
452 node_state->inputs = reinterpret_cast<InputState *>(memory);
453 memory += sizeof(InputState) * num_inputs;
454 node_state->outputs = reinterpret_cast<OutputState *>(memory);
456 default_construct_n(node_state->inputs, num_inputs);
457 default_construct_n(node_state->outputs, num_outputs);
459 node_states_[i] = node_state;
464 void destruct_node_state(
const Node &node, NodeState &node_state)
466 if (node.is_function()) {
467 const LazyFunction &fn =
static_cast<const FunctionNode &
>(
node).function();
468 if (node_state.storage !=
nullptr) {
469 fn.destruct_storage(node_state.storage);
472 for (
const int i : node.inputs().index_range()) {
473 InputState &input_state = node_state.
inputs[i];
474 const InputSocket &input_socket = node.input(i);
475 this->destruct_input_value_if_exists(input_state, input_socket.type());
477 std::destroy_at(&node_state);
483 void schedule_for_new_output_usages(CurrentTask ¤t_task,
const LocalData &local_data)
485 for (
const int graph_output_index : self_.graph_outputs_.index_range()) {
486 if (params_->output_was_set(graph_output_index)) {
489 const ValueUsage output_usage = params_->get_output_usage(graph_output_index);
490 if (output_usage == ValueUsage::Maybe) {
493 const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
494 const Node &node = socket.node();
495 NodeState &node_state = *node_states_[node.index_in_graph()];
496 this->with_locked_node(
497 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
498 if (output_usage == ValueUsage::Used) {
499 this->set_input_required(locked_node, socket);
502 this->set_input_unused(locked_node, socket);
508 void set_defaulted_graph_outputs(
const LocalData &local_data)
510 for (
const int graph_output_index : self_.graph_outputs_.index_range()) {
511 const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
512 if (socket.origin() !=
nullptr) {
515 const CPPType &type = socket.
type();
516 const void *default_value = socket.default_value();
519 if (self_.logger_ !=
nullptr) {
520 const Context context{context_->storage, context_->user_data, local_data.local_user_data};
521 self_.logger_->log_socket_value(socket, {type, default_value}, context);
524 void *output_ptr = params_->get_output_data_ptr(graph_output_index);
525 type.copy_construct(default_value, output_ptr);
526 params_->output_set(graph_output_index);
530 void set_always_unused_graph_inputs()
532 for (
const int i : self_.graph_inputs_.index_range()) {
533 const OutputSocket &socket = *self_.graph_inputs_[i];
534 const Node &node = socket.node();
535 const NodeState &node_state = *node_states_[node.index_in_graph()];
536 const OutputState &output_state = node_state.outputs[socket.index()];
537 if (output_state.usage == ValueUsage::Unused) {
538 params_->set_input_unused(i);
550 void initialize_static_value_usages(
const Span<const FunctionNode *> side_effect_nodes)
552 const Span<const Node *> all_nodes = self_.graph_.nodes();
555 Stack<const Node *> reachable_nodes_to_check;
556 Array<bool> reachable_node_flags(all_nodes.size(),
false);
559 for (
const InputSocket *socket : self_.graph_outputs_) {
560 const Node &node = socket->node();
561 const int node_index = node.index_in_graph();
562 if (!reachable_node_flags[node_index]) {
563 reachable_node_flags[node_index] =
true;
564 reachable_nodes_to_check.push(&node);
569 for (
const FunctionNode *node : side_effect_nodes) {
570 const int node_index = node->index_in_graph();
571 reachable_node_flags[node_index] =
true;
572 reachable_nodes_to_check.push(node);
576 while (!reachable_nodes_to_check.is_empty()) {
577 const Node &node = *reachable_nodes_to_check.pop();
578 for (
const InputSocket *input_socket : node.inputs()) {
579 const OutputSocket *origin_socket = input_socket->origin();
580 if (origin_socket !=
nullptr) {
581 const Node &origin_node = origin_socket->node();
582 const int origin_node_index = origin_node.index_in_graph();
583 if (!reachable_node_flags[origin_node_index]) {
584 reachable_node_flags[origin_node_index] =
true;
585 reachable_nodes_to_check.push(&origin_node);
591 for (
const int node_index : reachable_node_flags.index_range()) {
592 const Node &node = *all_nodes[node_index];
593 NodeState &node_state = *node_states_[node_index];
594 const bool node_is_reachable = reachable_node_flags[node_index];
595 if (node_is_reachable) {
596 for (
const int output_index : node.outputs().index_range()) {
597 const OutputSocket &output_socket = node.output(output_index);
598 OutputState &output_state = node_state.outputs[output_index];
600 for (
const InputSocket *target_socket : output_socket.targets()) {
601 const Node &target_node = target_socket->node();
602 const bool target_is_reachable = reachable_node_flags[target_node.index_in_graph()];
604 if (target_is_reachable) {
608 output_state.potential_target_sockets = use_count;
609 if (use_count == 0) {
610 output_state.usage = ValueUsage::Unused;
616 for (
const int input_index : node.inputs().index_range()) {
617 node_state.inputs[input_index].usage = ValueUsage::Unused;
623 void schedule_side_effect_nodes(
const Span<const FunctionNode *> side_effect_nodes,
624 CurrentTask ¤t_task,
625 const LocalData &local_data)
627 for (
const FunctionNode *node : side_effect_nodes) {
628 NodeState &node_state = *node_states_[node->index_in_graph()];
629 this->with_locked_node(
630 *node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
631 this->schedule_node(locked_node, current_task,
false);
636 void forward_newly_provided_inputs(CurrentTask ¤t_task,
const LocalData &local_data)
638 for (
const int graph_input_index : self_.graph_inputs_.index_range()) {
639 std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
640 if (was_loaded.load()) {
643 void *input_data = params_->try_get_input_data_ptr(graph_input_index);
644 if (input_data ==
nullptr) {
647 if (was_loaded.fetch_or(1)) {
651 this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
655 void forward_newly_provided_input(CurrentTask ¤t_task,
656 const LocalData &local_data,
657 const int graph_input_index,
660 const OutputSocket &socket = *self_.graph_inputs_[graph_input_index];
661 const CPPType &type = socket.type();
662 void *buffer = local_data.allocator->allocate(type.size(), type.alignment());
663 type.move_construct(input_data, buffer);
664 this->forward_value_to_linked_inputs(socket, {type, buffer}, current_task, local_data);
667 void notify_output_required(
const OutputSocket &socket,
668 CurrentTask ¤t_task,
669 const LocalData &local_data)
671 const Node &node = socket.node();
672 const int index_in_node = socket.index();
673 NodeState &node_state = *node_states_[node.index_in_graph()];
674 OutputState &output_state = node_state.outputs[index_in_node];
678 if (node.is_interface()) {
679 const int graph_input_index = self_.graph_input_index_by_socket_index_[socket.index()];
680 std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
681 if (was_loaded.load()) {
684 void *input_data = params_->try_get_input_data_ptr_or_request(graph_input_index);
685 if (input_data ==
nullptr) {
688 if (was_loaded.fetch_or(1)) {
692 this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
697 this->with_locked_node(
698 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
699 if (output_state.usage == ValueUsage::Used) {
702 output_state.usage = ValueUsage::Used;
703 this->schedule_node(locked_node, current_task,
false);
707 void notify_output_unused(
const OutputSocket &socket,
708 CurrentTask ¤t_task,
709 const LocalData &local_data)
711 const Node &node = socket.node();
712 const int index_in_node = socket.index();
713 NodeState &node_state = *node_states_[node.index_in_graph()];
714 OutputState &output_state = node_state.outputs[index_in_node];
716 this->with_locked_node(
717 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
718 output_state.potential_target_sockets -= 1;
719 if (output_state.potential_target_sockets == 0) {
720 BLI_assert(output_state.usage != ValueUsage::Unused);
721 if (output_state.usage == ValueUsage::Maybe) {
722 output_state.usage = ValueUsage::Unused;
723 if (node.is_interface()) {
724 const int graph_input_index =
725 self_.graph_input_index_by_socket_index_[socket.index()];
726 params_->set_input_unused(graph_input_index);
731 this->schedule_node(locked_node, current_task, true);
738 void schedule_node(LockedNode &locked_node, CurrentTask ¤t_task,
const bool is_priority)
741 switch (locked_node.node_state.schedule_state) {
742 case NodeScheduleState::NotScheduled: {
743 locked_node.node_state.schedule_state = NodeScheduleState::Scheduled;
744 const FunctionNode &node =
static_cast<const FunctionNode &
>(locked_node.node);
745 if (this->use_multi_threading()) {
746 std::lock_guard
lock{current_task.mutex};
747 current_task.scheduled_nodes.schedule(node, is_priority);
750 current_task.scheduled_nodes.schedule(node, is_priority);
752 current_task.has_scheduled_nodes.store(
true, std::memory_order_relaxed);
755 case NodeScheduleState::Scheduled: {
758 case NodeScheduleState::Running: {
759 locked_node.node_state.schedule_state = NodeScheduleState::RunningAndRescheduled;
762 case NodeScheduleState::RunningAndRescheduled: {
768 void with_locked_node(
const Node &node,
769 NodeState &node_state,
770 CurrentTask ¤t_task,
771 const LocalData &local_data,
772 const FunctionRef<
void(LockedNode &)> f)
774 BLI_assert(&node_state == node_states_[node.index_in_graph()]);
776 LockedNode locked_node{
node, node_state};
777 if (this->use_multi_threading()) {
778 std::lock_guard
lock{node_state.mutex};
779 threading::isolate_task([&]() { f(locked_node); });
785 this->send_output_required_notifications(
786 locked_node.delayed_required_outputs, current_task, local_data);
787 this->send_output_unused_notifications(
788 locked_node.delayed_unused_outputs, current_task, local_data);
791 void send_output_required_notifications(
const Span<const OutputSocket *> sockets,
792 CurrentTask ¤t_task,
793 const LocalData &local_data)
795 for (
const OutputSocket *socket : sockets) {
796 this->notify_output_required(*socket, current_task, local_data);
800 void send_output_unused_notifications(
const Span<const OutputSocket *> sockets,
801 CurrentTask ¤t_task,
802 const LocalData &local_data)
804 for (
const OutputSocket *socket : sockets) {
805 this->notify_output_unused(*socket, current_task, local_data);
809 void run_task(CurrentTask ¤t_task,
const LocalData &local_data)
811 while (
const FunctionNode *node = current_task.scheduled_nodes.pop_next_node()) {
812 if (current_task.scheduled_nodes.is_empty()) {
813 current_task.has_scheduled_nodes.store(
false, std::memory_order_relaxed);
815 this->run_node_task(*node, current_task, local_data);
819 if (current_task.scheduled_nodes.nodes_num() > 128) {
820 if (this->try_enable_multi_threading()) {
821 std::unique_ptr<ScheduledNodes> split_nodes = std::make_unique<ScheduledNodes>();
822 current_task.scheduled_nodes.split_into(*split_nodes);
823 this->push_to_task_pool(std::move(split_nodes));
829 void run_node_task(
const FunctionNode &node,
830 CurrentTask ¤t_task,
831 const LocalData &local_data)
833 NodeState &node_state = *node_states_[node.index_in_graph()];
834 LinearAllocator<> &allocator = *local_data.allocator;
835 Context local_context{context_->storage, context_->user_data, local_data.local_user_data};
836 const LazyFunction &fn = node.function();
838 bool node_needs_execution =
false;
839 this->with_locked_node(
840 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
841 BLI_assert(node_state.schedule_state == NodeScheduleState::Scheduled);
842 node_state.schedule_state = NodeScheduleState::Running;
844 if (node_state.node_has_finished) {
848 bool required_uncomputed_output_exists =
false;
849 for (
const int output_index : node.outputs().index_range()) {
850 OutputState &output_state = node_state.outputs[output_index];
851 output_state.usage_for_execution = output_state.usage;
852 if (output_state.usage == ValueUsage::Used && !output_state.has_been_computed) {
853 required_uncomputed_output_exists =
true;
856 if (!required_uncomputed_output_exists && !node_state.has_side_effects) {
860 if (!node_state.always_used_inputs_requested) {
862 const Span<Input> fn_inputs = fn.inputs();
863 for (
const int input_index : fn_inputs.index_range()) {
864 const Input &fn_input = fn_inputs[input_index];
865 if (fn_input.usage == ValueUsage::Used) {
866 const InputSocket &input_socket = node.input(input_index);
867 if (input_socket.origin() !=
nullptr) {
868 this->set_input_required(locked_node, input_socket);
873 node_state.always_used_inputs_requested =
true;
876 for (
const int input_index : node.inputs().index_range()) {
877 InputState &input_state = node_state.inputs[input_index];
878 if (input_state.was_ready_for_execution) {
881 if (input_state.value !=
nullptr) {
882 input_state.was_ready_for_execution =
true;
885 if (!fn.allow_missing_requested_inputs()) {
886 if (input_state.usage == ValueUsage::Used) {
892 node_needs_execution =
true;
895 if (node_needs_execution) {
896 if (!node_state.storage_and_defaults_initialized) {
898 node_state.storage = fn.init_storage(allocator);
901 for (
const int input_index : node.inputs().index_range()) {
902 const InputSocket &input_socket = node.input(input_index);
903 if (input_socket.origin() !=
nullptr) {
906 InputState &input_state = node_state.inputs[input_index];
907 const CPPType &type = input_socket.type();
908 const void *default_value = input_socket.default_value();
910 if (self_.logger_ !=
nullptr) {
911 self_.logger_->log_socket_value(input_socket, {type, default_value}, local_context);
914 input_state.value = allocator.allocate(type.size(), type.alignment());
915 type.copy_construct(default_value, input_state.value);
916 input_state.was_ready_for_execution =
true;
919 node_state.storage_and_defaults_initialized =
true;
925 this->execute_node(node, node_state, current_task, local_data);
928 this->with_locked_node(
929 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
931 if (node_needs_execution) {
932 this->assert_expected_outputs_have_been_computed(locked_node, local_data);
935 this->finish_node_if_possible(locked_node);
936 const bool reschedule_requested = node_state.schedule_state ==
937 NodeScheduleState::RunningAndRescheduled;
938 node_state.schedule_state = NodeScheduleState::NotScheduled;
939 if (reschedule_requested && !node_state.node_has_finished) {
940 this->schedule_node(locked_node, current_task,
false);
945 void assert_expected_outputs_have_been_computed(LockedNode &locked_node,
946 const LocalData &local_data)
948 const FunctionNode &node =
static_cast<const FunctionNode &
>(locked_node.node);
949 const NodeState &node_state = locked_node.node_state;
951 if (node_state.missing_required_inputs > 0) {
954 if (node_state.schedule_state == NodeScheduleState::RunningAndRescheduled) {
957 Vector<const OutputSocket *> missing_outputs;
958 for (
const int i : node.outputs().index_range()) {
959 const OutputState &output_state = node_state.outputs[i];
960 if (output_state.usage_for_execution == ValueUsage::Used) {
961 if (!output_state.has_been_computed) {
962 missing_outputs.append(&node.output(i));
966 if (!missing_outputs.is_empty()) {
967 if (self_.logger_ !=
nullptr) {
968 const Context context{context_->storage, context_->user_data, local_data.local_user_data};
969 self_.logger_->dump_when_outputs_are_missing(node, missing_outputs, context);
975 void finish_node_if_possible(LockedNode &locked_node)
977 const Node &node = locked_node.node;
978 NodeState &node_state = locked_node.node_state;
980 if (node_state.node_has_finished) {
985 for (
const int output_index : node.outputs().index_range()) {
986 const OutputState &output_state = node_state.outputs[output_index];
987 if (output_state.usage != ValueUsage::Unused && !output_state.has_been_computed) {
992 for (
const int input_index : node.inputs().index_range()) {
993 const InputState &input_state = node_state.inputs[input_index];
994 if (input_state.usage == ValueUsage::Used && !input_state.was_ready_for_execution) {
999 node_state.node_has_finished =
true;
1001 for (
const int input_index : node.inputs().index_range()) {
1002 const InputSocket &input_socket = node.input(input_index);
1003 InputState &input_state = node_state.inputs[input_index];
1004 if (input_state.usage == ValueUsage::Maybe) {
1005 this->set_input_unused(locked_node, input_socket);
1007 else if (input_state.usage == ValueUsage::Used) {
1008 this->destruct_input_value_if_exists(input_state, input_socket.type());
1012 if (node_state.storage !=
nullptr) {
1013 if (node.is_function()) {
1014 const FunctionNode &fn_node =
static_cast<const FunctionNode &
>(
node);
1015 fn_node.function().destruct_storage(node_state.storage);
1017 node_state.storage =
nullptr;
1021 void destruct_input_value_if_exists(InputState &input_state,
const CPPType &type)
1023 if (input_state.value !=
nullptr) {
1024 type.destruct(input_state.value);
1025 input_state.value =
nullptr;
1029 void execute_node(
const FunctionNode &node,
1030 NodeState &node_state,
1031 CurrentTask ¤t_task,
1032 const LocalData &local_data);
1034 void set_input_unused_during_execution(
const Node &node,
1035 NodeState &node_state,
1036 const int input_index,
1037 CurrentTask ¤t_task,
1038 const LocalData &local_data)
1040 const InputSocket &input_socket = node.input(input_index);
1041 this->with_locked_node(
1042 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1043 this->set_input_unused(locked_node, input_socket);
1047 void set_input_unused(LockedNode &locked_node,
const InputSocket &input_socket)
1049 NodeState &node_state = locked_node.node_state;
1050 const int input_index = input_socket.index();
1051 InputState &input_state = node_state.inputs[input_index];
1053 BLI_assert(input_state.usage != ValueUsage::Used);
1054 if (input_state.usage == ValueUsage::Unused) {
1057 input_state.usage = ValueUsage::Unused;
1059 this->destruct_input_value_if_exists(input_state, input_socket.type());
1060 if (input_state.was_ready_for_execution) {
1063 const OutputSocket *origin = input_socket.origin();
1064 if (origin !=
nullptr) {
1065 locked_node.delayed_unused_outputs.append(origin);
1069 void *set_input_required_during_execution(
const Node &node,
1070 NodeState &node_state,
1071 const int input_index,
1072 CurrentTask ¤t_task,
1073 const LocalData &local_data)
1075 const InputSocket &input_socket = node.input(input_index);
1077 this->with_locked_node(
1078 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1079 result = this->set_input_required(locked_node, input_socket);
1084 void *set_input_required(LockedNode &locked_node,
const InputSocket &input_socket)
1086 BLI_assert(&locked_node.node == &input_socket.node());
1087 NodeState &node_state = locked_node.node_state;
1088 const int input_index = input_socket.index();
1089 InputState &input_state = node_state.inputs[input_index];
1091 BLI_assert(input_state.usage != ValueUsage::Unused);
1093 if (input_state.value !=
nullptr) {
1094 input_state.was_ready_for_execution =
true;
1095 return input_state.value;
1097 if (input_state.usage == ValueUsage::Used) {
1100 input_state.usage = ValueUsage::Used;
1101 node_state.missing_required_inputs += 1;
1103 const OutputSocket *origin_socket = input_socket.origin();
1106 locked_node.delayed_required_outputs.append(origin_socket);
1110 void forward_value_to_linked_inputs(
const OutputSocket &from_socket,
1111 GMutablePointer value_to_forward,
1112 CurrentTask ¤t_task,
1113 const LocalData &local_data)
1115 BLI_assert(value_to_forward.get() !=
nullptr);
1116 const CPPType &type = *value_to_forward.type();
1117 const Context local_context{
1118 context_->storage, context_->user_data, local_data.local_user_data};
1120 if (self_.logger_ !=
nullptr) {
1121 self_.logger_->log_socket_value(from_socket, value_to_forward, local_context);
1124 const Span<const InputSocket *> targets = from_socket.targets();
1125 for (
const InputSocket *target_socket : targets) {
1126 const Node &target_node = target_socket->node();
1127 NodeState &node_state = *node_states_[target_node.index_in_graph()];
1128 const int input_index = target_socket->index();
1129 InputState &input_state = node_state.inputs[input_index];
1130 const bool is_last_target = target_socket == targets.last();
1132 if (input_state.value !=
nullptr) {
1133 if (self_.logger_ !=
nullptr) {
1134 self_.logger_->dump_when_input_is_set_twice(*target_socket, from_socket, local_context);
1139 BLI_assert(!input_state.was_ready_for_execution);
1141 BLI_assert(target_socket->origin() == &from_socket);
1143 if (self_.logger_ !=
nullptr) {
1144 self_.logger_->log_socket_value(*target_socket, value_to_forward, local_context);
1146 if (target_node.is_interface()) {
1148 const int graph_output_index =
1149 self_.graph_output_index_by_socket_index_[target_socket->index()];
1150 if (graph_output_index != -1 &&
1151 params_->get_output_usage(graph_output_index) != ValueUsage::Unused)
1153 void *dst_buffer = params_->get_output_data_ptr(graph_output_index);
1154 if (is_last_target) {
1155 type.move_construct(value_to_forward.get(), dst_buffer);
1158 type.copy_construct(value_to_forward.get(), dst_buffer);
1160 params_->output_set(graph_output_index);
1164 this->with_locked_node(
1165 target_node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1166 if (input_state.usage == ValueUsage::Unused) {
1169 if (is_last_target) {
1171 this->forward_value_to_input(
1172 locked_node, input_state, value_to_forward, current_task);
1173 value_to_forward = {};
1176 void *buffer = local_data.allocator->allocate(type.size(), type.alignment());
1177 type.copy_construct(value_to_forward.get(), buffer);
1178 this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
1182 if (value_to_forward.get() !=
nullptr) {
1183 value_to_forward.destruct();
1187 void forward_value_to_input(LockedNode &locked_node,
1188 InputState &input_state,
1189 GMutablePointer value,
1190 CurrentTask ¤t_task)
1192 NodeState &node_state = locked_node.node_state;
1195 BLI_assert(!input_state.was_ready_for_execution);
1196 input_state.value = value.get();
1198 if (input_state.usage == ValueUsage::Used) {
1199 node_state.missing_required_inputs -= 1;
1200 if (node_state.missing_required_inputs == 0 ||
1201 (locked_node.node.is_function() &&
static_cast<const FunctionNode &
>(locked_node.node)
1203 .allow_missing_requested_inputs()))
1205 this->schedule_node(locked_node, current_task,
false);
1210 bool use_multi_threading()
const
1212 return task_pool_.load() !=
nullptr;
1215 bool try_enable_multi_threading()
1223 if (this->use_multi_threading()) {
1226#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
1229 if (current_main_thread_ != std::this_thread::get_id()) {
1234 if (!params_->try_enable_multi_threading()) {
1241 this->ensure_thread_locals();
1246 void ensure_thread_locals()
1248#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
1249 if (current_main_thread_ != std::this_thread::get_id()) {
1253 if (!thread_locals_) {
1254 thread_locals_ = std::make_unique<threading::EnumerableThreadSpecific<ThreadLocalStorage>>();
1261 void push_all_scheduled_nodes_to_task_pool(CurrentTask ¤t_task)
1264 std::unique_ptr<ScheduledNodes> scheduled_nodes = std::make_unique<ScheduledNodes>();
1266 std::lock_guard
lock{current_task.mutex};
1267 if (current_task.scheduled_nodes.is_empty()) {
1270 *scheduled_nodes = std::move(current_task.scheduled_nodes);
1271 current_task.has_scheduled_nodes.store(
false, std::memory_order_relaxed);
1273 this->push_to_task_pool(std::move(scheduled_nodes));
1276 void push_to_task_pool(std::unique_ptr<ScheduledNodes> scheduled_nodes)
1283 Executor &executor = *static_cast<Executor *>(BLI_task_pool_user_data(pool));
1284 ScheduledNodes &scheduled_nodes = *static_cast<ScheduledNodes *>(data);
1285 CurrentTask new_current_task;
1286 new_current_task.scheduled_nodes = std::move(scheduled_nodes);
1287 new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
1288 const LocalData local_data = executor.get_local_data();
1289 executor.run_task(new_current_task, local_data);
1291 scheduled_nodes.release(),
1293 [](
TaskPool * ,
void *data) { delete static_cast<ScheduledNodes *>(data); });
1298 if (!this->use_multi_threading()) {
1299 return {&main_allocator_, context_->local_user_data};
1301 ThreadLocalStorage &local_storage = thread_locals_->local();
1302 if (!local_storage.local_user_data.has_value()) {
1303 local_storage.local_user_data = context_->user_data->get_local(local_storage.allocator);
1305 return {&local_storage.allocator, local_storage.local_user_data->get()};