Blender V5.0
lazy_function_graph_executor.cc
Go to the documentation of this file.
1/* SPDX-FileCopyrightText: 2023 Blender Authors
2 *
3 * SPDX-License-Identifier: GPL-2.0-or-later */
4
43
44#include <atomic>
45
47#include "BLI_function_ref.hh"
48#include "BLI_mutex.hh"
49#include "BLI_stack.hh"
50#include "BLI_task.h"
51#include "BLI_task.hh"
52
54
56
79
80struct InputState {
88 void *value = nullptr;
100};
101
131
187
212
213class Executor;
214class GraphExecutorLFParams;
215
221 private:
225
226 public:
227 void schedule(const FunctionNode &node, const bool is_priority)
228 {
229 if (is_priority) {
230 this->priority_.append(&node);
231 }
232 else {
233 this->normal_.append(&node);
234 }
235 }
236
238 {
239 if (!this->priority_.is_empty()) {
240 return this->priority_.pop_last();
241 }
242 if (!this->normal_.is_empty()) {
243 return this->normal_.pop_last();
244 }
245 return nullptr;
246 }
247
248 bool is_empty() const
249 {
250 return this->priority_.is_empty() && this->normal_.is_empty();
251 }
252
254 {
255 return priority_.size() + normal_.size();
256 }
257
262 {
263 BLI_assert(this != &other);
264 const int64_t priority_split = priority_.size() / 2;
265 const int64_t normal_split = normal_.size() / 2;
266 other.priority_.extend(priority_.as_span().drop_front(priority_split));
267 other.normal_.extend(normal_.as_span().drop_front(normal_split));
268 priority_.resize(priority_split);
269 normal_.resize(normal_split);
270 }
271};
272
288
289class Executor {
290 private:
291 const GraphExecutor &self_;
296 MutableSpan<std::atomic<uint8_t>> loaded_inputs_;
300 MutableSpan<NodeState *> node_states_;
304 Params *params_ = nullptr;
305 const Context *context_ = nullptr;
310 std::atomic<TaskPool *> task_pool_ = nullptr;
311#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
312 std::thread::id current_main_thread_;
313#endif
314
315 struct ThreadLocalStorage {
320 LinearAllocator<> allocator;
321 std::optional<destruct_ptr<LocalUserData>> local_user_data;
322 };
323 std::unique_ptr<threading::EnumerableThreadSpecific<ThreadLocalStorage>> thread_locals_;
324 LinearAllocator<> main_allocator_;
328 bool is_first_execution_ = true;
329
330 friend GraphExecutorLFParams;
331
338 struct LocalData {
339 LinearAllocator<> *allocator;
340 LocalUserData *local_user_data;
341 };
342
343 public:
345 {
346 /* The indices are necessary, because they are used as keys in #node_states_. */
347 BLI_assert(self_.graph_.node_indices_are_valid());
348 }
349
351 {
352 if (TaskPool *task_pool = task_pool_.load()) {
354 }
355 threading::parallel_for(node_states_.index_range(), 1024, [&](const IndexRange range) {
356 for (const int node_index : range) {
357 const Node &node = *self_.graph_.nodes()[node_index];
358 NodeState &node_state = *node_states_[node_index];
359 this->destruct_node_state(node, node_state);
360 }
361 });
362 }
363
367 void execute(Params &params, const Context &context)
368 {
369 params_ = &params;
370 context_ = &context;
371#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
372 current_main_thread_ = std::this_thread::get_id();
373#endif
374 const auto deferred_func = [&]() {
375 /* Make sure the pointers are not dangling, even when it shouldn't be accessed by anyone. */
376 params_ = nullptr;
377 context_ = nullptr;
378 is_first_execution_ = false;
379#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
380 current_main_thread_ = {};
381#endif
382 };
383 BLI_SCOPED_DEFER(deferred_func);
384
385 const LocalData local_data = this->get_local_data();
386
387 CurrentTask current_task;
388 if (is_first_execution_) {
389 /* Allocate a single large buffer instead of making many smaller allocations below. */
390 char *buffer = static_cast<char *>(
391 local_data.allocator->allocate(self_.init_buffer_info_.total_size, alignof(void *)));
392 this->initialize_node_states(buffer);
393
394 loaded_inputs_ = MutableSpan{
395 reinterpret_cast<std::atomic<uint8_t> *>(
396 buffer + self_.init_buffer_info_.loaded_inputs_array_offset),
397 self_.graph_inputs_.size()};
398 /* Initialize atomics to zero. */
399 memset(static_cast<void *>(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool));
400
401 this->set_always_unused_graph_inputs();
402 this->set_defaulted_graph_outputs(local_data);
403
404 /* Retrieve and tag side effect nodes. */
405 Vector<const FunctionNode *> side_effect_nodes;
406 if (self_.side_effect_provider_ != nullptr) {
407 side_effect_nodes = self_.side_effect_provider_->get_nodes_with_side_effects(context);
408 for (const FunctionNode *node : side_effect_nodes) {
409 BLI_assert(self_.graph_.nodes().contains(node));
410 const int node_index = node->index_in_graph();
411 NodeState &node_state = *node_states_[node_index];
412 node_state.has_side_effects = true;
413 }
414 }
415
416 this->initialize_static_value_usages(side_effect_nodes);
417 this->schedule_side_effect_nodes(side_effect_nodes, current_task, local_data);
418 }
419
420 this->schedule_for_new_output_usages(current_task, local_data);
421 this->forward_newly_provided_inputs(current_task, local_data);
422
423 this->run_task(current_task, local_data);
424
425 if (TaskPool *task_pool = task_pool_.load()) {
427 }
428 }
429
430 private:
431 void initialize_node_states(char *buffer)
432 {
433 Span<const Node *> nodes = self_.graph_.nodes();
434 node_states_ = MutableSpan{
435 reinterpret_cast<NodeState **>(buffer + self_.init_buffer_info_.node_states_array_offset),
436 nodes.size()};
437
438 threading::parallel_for(nodes.index_range(), 1024, [&](const IndexRange range) {
439 for (const int i : range) {
440 const Node &node = *nodes[i];
441 char *memory = buffer + self_.init_buffer_info_.node_states_offsets[i];
442
443 /* Initialize node state. */
444 NodeState *node_state = reinterpret_cast<NodeState *>(memory);
445 memory += sizeof(NodeState);
446 new (node_state) NodeState();
447
448 /* Initialize socket states. */
449 const int num_inputs = node.inputs().size();
450 const int num_outputs = node.outputs().size();
451 node_state->inputs = reinterpret_cast<InputState *>(memory);
452 memory += sizeof(InputState) * num_inputs;
453 node_state->outputs = reinterpret_cast<OutputState *>(memory);
454
455 default_construct_n(node_state->inputs, num_inputs);
456 default_construct_n(node_state->outputs, num_outputs);
457
458 node_states_[i] = node_state;
459 }
460 });
461 }
462
463 void destruct_node_state(const Node &node, NodeState &node_state)
464 {
465 if (node.is_function()) {
466 const LazyFunction &fn = static_cast<const FunctionNode &>(node).function();
467 if (node_state.storage != nullptr) {
468 fn.destruct_storage(node_state.storage);
469 }
470 }
471 for (const int i : node.inputs().index_range()) {
472 InputState &input_state = node_state.inputs[i];
473 const InputSocket &input_socket = node.input(i);
474 this->destruct_input_value_if_exists(input_state, input_socket.type());
475 }
476 std::destroy_at(&node_state);
477 }
478
482 void schedule_for_new_output_usages(CurrentTask &current_task, const LocalData &local_data)
483 {
484 for (const int graph_output_index : self_.graph_outputs_.index_range()) {
485 if (params_->output_was_set(graph_output_index)) {
486 continue;
487 }
488 const ValueUsage output_usage = params_->get_output_usage(graph_output_index);
489 if (output_usage == ValueUsage::Maybe) {
490 continue;
491 }
492 const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
493 const Node &node = socket.node();
494 NodeState &node_state = *node_states_[node.index_in_graph()];
495 this->with_locked_node(
496 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
497 if (output_usage == ValueUsage::Used) {
498 this->set_input_required(locked_node, socket);
499 }
500 else {
501 this->set_input_unused(locked_node, socket);
502 }
503 });
504 }
505 }
506
507 void set_defaulted_graph_outputs(const LocalData &local_data)
508 {
509 for (const int graph_output_index : self_.graph_outputs_.index_range()) {
510 const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
511 if (socket.origin() != nullptr) {
512 continue;
513 }
514 const CPPType &type = socket.type();
515 const void *default_value = socket.default_value();
516 BLI_assert(default_value != nullptr);
517
518 if (self_.logger_ != nullptr) {
519 const Context context{context_->storage, context_->user_data, local_data.local_user_data};
520 self_.logger_->log_socket_value(socket, {type, default_value}, context);
521 }
522
523 void *output_ptr = params_->get_output_data_ptr(graph_output_index);
524 type.copy_construct(default_value, output_ptr);
525 params_->output_set(graph_output_index);
526 }
527 }
528
529 void set_always_unused_graph_inputs()
530 {
531 for (const int i : self_.graph_inputs_.index_range()) {
532 const OutputSocket &socket = *self_.graph_inputs_[i];
533 const Node &node = socket.node();
534 const NodeState &node_state = *node_states_[node.index_in_graph()];
535 const OutputState &output_state = node_state.outputs[socket.index()];
536 if (output_state.usage == ValueUsage::Unused) {
537 params_->set_input_unused(i);
538 }
539 }
540 }
541
549 void initialize_static_value_usages(const Span<const FunctionNode *> side_effect_nodes)
550 {
551 const Span<const Node *> all_nodes = self_.graph_.nodes();
552
553 /* Used for a search through all nodes that outputs depend on. */
554 Stack<const Node *> reachable_nodes_to_check;
555 Array<bool> reachable_node_flags(all_nodes.size(), false);
556
557 /* Graph outputs are always reachable. */
558 for (const InputSocket *socket : self_.graph_outputs_) {
559 const Node &node = socket->node();
560 const int node_index = node.index_in_graph();
561 if (!reachable_node_flags[node_index]) {
562 reachable_node_flags[node_index] = true;
563 reachable_nodes_to_check.push(&node);
564 }
565 }
566
567 /* Side effect nodes are always reachable. */
568 for (const FunctionNode *node : side_effect_nodes) {
569 const int node_index = node->index_in_graph();
570 reachable_node_flags[node_index] = true;
571 reachable_nodes_to_check.push(node);
572 }
573
574 /* Tag every node that reachable nodes depend on using depth-first-search. */
575 while (!reachable_nodes_to_check.is_empty()) {
576 const Node &node = *reachable_nodes_to_check.pop();
577 for (const InputSocket *input_socket : node.inputs()) {
578 const OutputSocket *origin_socket = input_socket->origin();
579 if (origin_socket != nullptr) {
580 const Node &origin_node = origin_socket->node();
581 const int origin_node_index = origin_node.index_in_graph();
582 if (!reachable_node_flags[origin_node_index]) {
583 reachable_node_flags[origin_node_index] = true;
584 reachable_nodes_to_check.push(&origin_node);
585 }
586 }
587 }
588 }
589
590 for (const int node_index : reachable_node_flags.index_range()) {
591 const Node &node = *all_nodes[node_index];
592 NodeState &node_state = *node_states_[node_index];
593 const bool node_is_reachable = reachable_node_flags[node_index];
594 if (node_is_reachable) {
595 for (const int output_index : node.outputs().index_range()) {
596 const OutputSocket &output_socket = node.output(output_index);
597 OutputState &output_state = node_state.outputs[output_index];
598 int use_count = 0;
599 for (const InputSocket *target_socket : output_socket.targets()) {
600 const Node &target_node = target_socket->node();
601 const bool target_is_reachable = reachable_node_flags[target_node.index_in_graph()];
602 /* Only count targets that are reachable. */
603 if (target_is_reachable) {
604 use_count++;
605 }
606 }
607 output_state.potential_target_sockets = use_count;
608 if (use_count == 0) {
609 output_state.usage = ValueUsage::Unused;
610 }
611 }
612 }
613 else {
614 /* Inputs of unreachable nodes are unused. */
615 for (const int input_index : node.inputs().index_range()) {
616 node_state.inputs[input_index].usage = ValueUsage::Unused;
617 }
618 }
619 }
620 }
621
622 void schedule_side_effect_nodes(const Span<const FunctionNode *> side_effect_nodes,
623 CurrentTask &current_task,
624 const LocalData &local_data)
625 {
626 for (const FunctionNode *node : side_effect_nodes) {
627 NodeState &node_state = *node_states_[node->index_in_graph()];
628 this->with_locked_node(
629 *node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
630 this->schedule_node(locked_node, current_task, false);
631 });
632 }
633 }
634
635 void forward_newly_provided_inputs(CurrentTask &current_task, const LocalData &local_data)
636 {
637 for (const int graph_input_index : self_.graph_inputs_.index_range()) {
638 std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
639 if (was_loaded.load()) {
640 continue;
641 }
642 void *input_data = params_->try_get_input_data_ptr(graph_input_index);
643 if (input_data == nullptr) {
644 continue;
645 }
646 if (was_loaded.fetch_or(1)) {
647 /* The value was forwarded before. */
648 continue;
649 }
650 this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
651 }
652 }
653
654 void forward_newly_provided_input(CurrentTask &current_task,
655 const LocalData &local_data,
656 const int graph_input_index,
657 void *input_data)
658 {
659 const OutputSocket &socket = *self_.graph_inputs_[graph_input_index];
660 const CPPType &type = socket.type();
661 void *buffer = local_data.allocator->allocate(type);
662 type.move_construct(input_data, buffer);
663 this->forward_value_to_linked_inputs(socket, {type, buffer}, current_task, local_data);
664 }
665
666 void notify_output_required(const OutputSocket &socket,
667 CurrentTask &current_task,
668 const LocalData &local_data)
669 {
670 const Node &node = socket.node();
671 const int index_in_node = socket.index();
672 NodeState &node_state = *node_states_[node.index_in_graph()];
673 OutputState &output_state = node_state.outputs[index_in_node];
674
675 /* The notified output socket might be an input of the entire graph. In this case, notify the
676 * caller that the input is required. */
677 if (node.is_interface()) {
678 const int graph_input_index = self_.graph_input_index_by_socket_index_[socket.index()];
679 std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
680 if (was_loaded.load()) {
681 return;
682 }
683 void *input_data = params_->try_get_input_data_ptr_or_request(graph_input_index);
684 if (input_data == nullptr) {
685 return;
686 }
687 if (was_loaded.fetch_or(1)) {
688 /* The value was forwarded already. */
689 return;
690 }
691 this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
692 return;
693 }
694
695 BLI_assert(node.is_function());
696 this->with_locked_node(
697 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
698 if (output_state.usage == ValueUsage::Used) {
699 return;
700 }
701 output_state.usage = ValueUsage::Used;
702 this->schedule_node(locked_node, current_task, false);
703 });
704 }
705
706 void notify_output_unused(const OutputSocket &socket,
707 CurrentTask &current_task,
708 const LocalData &local_data)
709 {
710 const Node &node = socket.node();
711 const int index_in_node = socket.index();
712 NodeState &node_state = *node_states_[node.index_in_graph()];
713 OutputState &output_state = node_state.outputs[index_in_node];
714
715 this->with_locked_node(
716 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
717 output_state.potential_target_sockets -= 1;
718 if (output_state.potential_target_sockets == 0) {
719 BLI_assert(output_state.usage != ValueUsage::Unused);
720 if (output_state.usage == ValueUsage::Maybe) {
721 output_state.usage = ValueUsage::Unused;
722 if (node.is_interface()) {
723 const int graph_input_index =
724 self_.graph_input_index_by_socket_index_[socket.index()];
725 params_->set_input_unused(graph_input_index);
726 }
727 else {
728 /* Schedule as priority node. This allows freeing up memory earlier which results
729 * in better memory reuse and fewer implicit sharing copies. */
730 this->schedule_node(locked_node, current_task, true);
731 }
732 }
733 }
734 });
735 }
736
737 void schedule_node(LockedNode &locked_node, CurrentTask &current_task, const bool is_priority)
738 {
739 BLI_assert(locked_node.node.is_function());
740 switch (locked_node.node_state.schedule_state) {
741 case NodeScheduleState::NotScheduled: {
742 locked_node.node_state.schedule_state = NodeScheduleState::Scheduled;
743 const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node);
744 if (this->use_multi_threading()) {
745 std::lock_guard lock{current_task.mutex};
746 current_task.scheduled_nodes.schedule(node, is_priority);
747 }
748 else {
749 current_task.scheduled_nodes.schedule(node, is_priority);
750 }
751 current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
752 break;
753 }
754 case NodeScheduleState::Scheduled: {
755 break;
756 }
757 case NodeScheduleState::Running: {
758 locked_node.node_state.schedule_state = NodeScheduleState::RunningAndRescheduled;
759 break;
760 }
761 case NodeScheduleState::RunningAndRescheduled: {
762 break;
763 }
764 }
765 }
766
767 void with_locked_node(const Node &node,
768 NodeState &node_state,
769 CurrentTask &current_task,
770 const LocalData &local_data,
771 const FunctionRef<void(LockedNode &)> f)
772 {
773 BLI_assert(&node_state == node_states_[node.index_in_graph()]);
774
775 LockedNode locked_node{node, node_state};
776 if (this->use_multi_threading()) {
777 std::lock_guard lock{node_state.mutex};
778 threading::isolate_task([&]() { f(locked_node); });
779 }
780 else {
781 f(locked_node);
782 }
783
784 this->send_output_required_notifications(
785 locked_node.delayed_required_outputs, current_task, local_data);
786 this->send_output_unused_notifications(
787 locked_node.delayed_unused_outputs, current_task, local_data);
788 }
789
790 void send_output_required_notifications(const Span<const OutputSocket *> sockets,
791 CurrentTask &current_task,
792 const LocalData &local_data)
793 {
794 for (const OutputSocket *socket : sockets) {
795 this->notify_output_required(*socket, current_task, local_data);
796 }
797 }
798
799 void send_output_unused_notifications(const Span<const OutputSocket *> sockets,
800 CurrentTask &current_task,
801 const LocalData &local_data)
802 {
803 for (const OutputSocket *socket : sockets) {
804 this->notify_output_unused(*socket, current_task, local_data);
805 }
806 }
807
808 void run_task(CurrentTask &current_task, const LocalData &local_data)
809 {
810 while (const FunctionNode *node = current_task.scheduled_nodes.pop_next_node()) {
811 if (current_task.scheduled_nodes.is_empty()) {
812 current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
813 }
814 this->run_node_task(*node, current_task, local_data);
815
816 /* If there are many nodes scheduled at the same time, it's beneficial to let multiple
817 * threads work on those. */
818 if (current_task.scheduled_nodes.nodes_num() > 128) {
819 if (this->try_enable_multi_threading()) {
820 std::unique_ptr<ScheduledNodes> split_nodes = std::make_unique<ScheduledNodes>();
821 current_task.scheduled_nodes.split_into(*split_nodes);
822 this->push_to_task_pool(std::move(split_nodes));
823 }
824 }
825 }
826 }
827
828 void run_node_task(const FunctionNode &node,
829 CurrentTask &current_task,
830 const LocalData &local_data)
831 {
832 NodeState &node_state = *node_states_[node.index_in_graph()];
833 LinearAllocator<> &allocator = *local_data.allocator;
834 Context local_context{context_->storage, context_->user_data, local_data.local_user_data};
835 const LazyFunction &fn = node.function();
836
837 bool node_needs_execution = false;
838 this->with_locked_node(
839 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
840 BLI_assert(node_state.schedule_state == NodeScheduleState::Scheduled);
841 node_state.schedule_state = NodeScheduleState::Running;
842
843 if (node_state.node_has_finished) {
844 return;
845 }
846
847 bool required_uncomputed_output_exists = false;
848 for (const int output_index : node.outputs().index_range()) {
849 OutputState &output_state = node_state.outputs[output_index];
850 output_state.usage_for_execution = output_state.usage;
851 if (output_state.usage == ValueUsage::Used && !output_state.has_been_computed) {
852 required_uncomputed_output_exists = true;
853 }
854 }
855 if (!required_uncomputed_output_exists && !node_state.has_side_effects) {
856 return;
857 }
858
859 if (!node_state.always_used_inputs_requested) {
860 /* Request linked inputs that are always needed. */
861 const Span<Input> fn_inputs = fn.inputs();
862 for (const int input_index : fn_inputs.index_range()) {
863 const Input &fn_input = fn_inputs[input_index];
864 if (fn_input.usage == ValueUsage::Used) {
865 const InputSocket &input_socket = node.input(input_index);
866 if (input_socket.origin() != nullptr) {
867 this->set_input_required(locked_node, input_socket);
868 }
869 }
870 }
871
872 node_state.always_used_inputs_requested = true;
873 }
874
875 for (const int input_index : node.inputs().index_range()) {
876 InputState &input_state = node_state.inputs[input_index];
877 if (input_state.was_ready_for_execution) {
878 continue;
879 }
880 if (input_state.value != nullptr) {
881 input_state.was_ready_for_execution = true;
882 continue;
883 }
884 if (!fn.allow_missing_requested_inputs()) {
885 if (input_state.usage == ValueUsage::Used) {
886 return;
887 }
888 }
889 }
890
891 node_needs_execution = true;
892 });
893
894 if (node_needs_execution) {
895 if (!node_state.storage_and_defaults_initialized) {
896 /* Initialize storage. */
897 node_state.storage = fn.init_storage(allocator);
898
899 /* Load unlinked inputs. */
900 for (const int input_index : node.inputs().index_range()) {
901 const InputSocket &input_socket = node.input(input_index);
902 if (input_socket.origin() != nullptr) {
903 continue;
904 }
905 InputState &input_state = node_state.inputs[input_index];
906 const CPPType &type = input_socket.type();
907 const void *default_value = input_socket.default_value();
908 BLI_assert(default_value != nullptr);
909 if (self_.logger_ != nullptr) {
910 self_.logger_->log_socket_value(input_socket, {type, default_value}, local_context);
911 }
912 BLI_assert(input_state.value == nullptr);
913 input_state.value = allocator.allocate(type);
914 type.copy_construct(default_value, input_state.value);
915 input_state.was_ready_for_execution = true;
916 }
917
918 node_state.storage_and_defaults_initialized = true;
919 }
920
921 /* Importantly, the node must not be locked when it is executed. That would result in locks
922 * being hold very long in some cases and results in multiple locks being hold by the same
923 * thread in the same graph which can lead to deadlocks. */
924 this->execute_node(node, node_state, current_task, local_data);
925 }
926
927 this->with_locked_node(
928 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
929#ifndef NDEBUG
930 if (node_needs_execution) {
931 this->assert_expected_outputs_have_been_computed(locked_node, local_data);
932 }
933#endif
934 this->finish_node_if_possible(locked_node);
935 const bool reschedule_requested = node_state.schedule_state ==
936 NodeScheduleState::RunningAndRescheduled;
937 node_state.schedule_state = NodeScheduleState::NotScheduled;
938 if (reschedule_requested && !node_state.node_has_finished) {
939 this->schedule_node(locked_node, current_task, false);
940 }
941 });
942 }
943
944 void assert_expected_outputs_have_been_computed(LockedNode &locked_node,
945 const LocalData &local_data)
946 {
947 const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node);
948 const NodeState &node_state = locked_node.node_state;
949
950 if (node_state.missing_required_inputs > 0) {
951 return;
952 }
953 if (node_state.schedule_state == NodeScheduleState::RunningAndRescheduled) {
954 return;
955 }
956 Vector<const OutputSocket *> missing_outputs;
957 for (const int i : node.outputs().index_range()) {
958 const OutputState &output_state = node_state.outputs[i];
959 if (output_state.usage_for_execution == ValueUsage::Used) {
960 if (!output_state.has_been_computed) {
961 missing_outputs.append(&node.output(i));
962 }
963 }
964 }
965 if (!missing_outputs.is_empty()) {
966 if (self_.logger_ != nullptr) {
967 const Context context{context_->storage, context_->user_data, local_data.local_user_data};
968 self_.logger_->dump_when_outputs_are_missing(node, missing_outputs, context);
969 }
971 }
972 }
973
974 void finish_node_if_possible(LockedNode &locked_node)
975 {
976 const Node &node = locked_node.node;
977 NodeState &node_state = locked_node.node_state;
978
979 if (node_state.node_has_finished) {
980 /* Was finished already. */
981 return;
982 }
983 /* If there are outputs that may still be used, the node is not done yet. */
984 for (const int output_index : node.outputs().index_range()) {
985 const OutputState &output_state = node_state.outputs[output_index];
986 if (output_state.usage != ValueUsage::Unused && !output_state.has_been_computed) {
987 return;
988 }
989 }
990 /* If the node is still waiting for inputs, it is not done yet. */
991 for (const int input_index : node.inputs().index_range()) {
992 const InputState &input_state = node_state.inputs[input_index];
993 if (input_state.usage == ValueUsage::Used && !input_state.was_ready_for_execution) {
994 return;
995 }
996 }
997
998 node_state.node_has_finished = true;
999
1000 for (const int input_index : node.inputs().index_range()) {
1001 const InputSocket &input_socket = node.input(input_index);
1002 InputState &input_state = node_state.inputs[input_index];
1003 if (input_state.usage == ValueUsage::Maybe) {
1004 this->set_input_unused(locked_node, input_socket);
1005 }
1006 else if (input_state.usage == ValueUsage::Used) {
1007 this->destruct_input_value_if_exists(input_state, input_socket.type());
1008 }
1009 }
1010
1011 if (node_state.storage != nullptr) {
1012 if (node.is_function()) {
1013 const FunctionNode &fn_node = static_cast<const FunctionNode &>(node);
1014 fn_node.function().destruct_storage(node_state.storage);
1015 }
1016 node_state.storage = nullptr;
1017 }
1018 }
1019
1020 void destruct_input_value_if_exists(InputState &input_state, const CPPType &type)
1021 {
1022 if (input_state.value != nullptr) {
1023 type.destruct(input_state.value);
1024 input_state.value = nullptr;
1025 }
1026 }
1027
1028 void execute_node(const FunctionNode &node,
1029 NodeState &node_state,
1030 CurrentTask &current_task,
1031 const LocalData &local_data);
1032
1033 void set_input_unused_during_execution(const Node &node,
1034 NodeState &node_state,
1035 const int input_index,
1036 CurrentTask &current_task,
1037 const LocalData &local_data)
1038 {
1039 const InputSocket &input_socket = node.input(input_index);
1040 this->with_locked_node(
1041 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1042 this->set_input_unused(locked_node, input_socket);
1043 });
1044 }
1045
1046 void set_input_unused(LockedNode &locked_node, const InputSocket &input_socket)
1047 {
1048 NodeState &node_state = locked_node.node_state;
1049 const int input_index = input_socket.index();
1050 InputState &input_state = node_state.inputs[input_index];
1051
1052 BLI_assert(input_state.usage != ValueUsage::Used);
1053 if (input_state.usage == ValueUsage::Unused) {
1054 return;
1055 }
1056 input_state.usage = ValueUsage::Unused;
1057
1058 this->destruct_input_value_if_exists(input_state, input_socket.type());
1059 if (input_state.was_ready_for_execution) {
1060 return;
1061 }
1062 const OutputSocket *origin = input_socket.origin();
1063 if (origin != nullptr) {
1064 locked_node.delayed_unused_outputs.append(origin);
1065 }
1066 }
1067
1068 void *set_input_required_during_execution(const Node &node,
1069 NodeState &node_state,
1070 const int input_index,
1071 CurrentTask &current_task,
1072 const LocalData &local_data)
1073 {
1074 const InputSocket &input_socket = node.input(input_index);
1075 void *result;
1076 this->with_locked_node(
1077 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1078 result = this->set_input_required(locked_node, input_socket);
1079 });
1080 return result;
1081 }
1082
1083 void *set_input_required(LockedNode &locked_node, const InputSocket &input_socket)
1084 {
1085 BLI_assert(&locked_node.node == &input_socket.node());
1086 NodeState &node_state = locked_node.node_state;
1087 const int input_index = input_socket.index();
1088 InputState &input_state = node_state.inputs[input_index];
1089
1090 BLI_assert(input_state.usage != ValueUsage::Unused);
1091
1092 if (input_state.value != nullptr) {
1093 input_state.was_ready_for_execution = true;
1094 return input_state.value;
1095 }
1096 if (input_state.usage == ValueUsage::Used) {
1097 return nullptr;
1098 }
1099 input_state.usage = ValueUsage::Used;
1100 node_state.missing_required_inputs += 1;
1101
1102 const OutputSocket *origin_socket = input_socket.origin();
1103 /* Unlinked inputs are always loaded in advance. */
1104 BLI_assert(origin_socket != nullptr);
1105 locked_node.delayed_required_outputs.append(origin_socket);
1106 return nullptr;
1107 }
1108
1109 void forward_value_to_linked_inputs(const OutputSocket &from_socket,
1110 GMutablePointer value_to_forward,
1111 CurrentTask &current_task,
1112 const LocalData &local_data)
1113 {
1114 BLI_assert(value_to_forward.get() != nullptr);
1115 const CPPType &type = *value_to_forward.type();
1116 const Context local_context{
1117 context_->storage, context_->user_data, local_data.local_user_data};
1118
1119 if (self_.logger_ != nullptr) {
1120 self_.logger_->log_socket_value(from_socket, value_to_forward, local_context);
1121 }
1122
1123 const Span<const InputSocket *> targets = from_socket.targets();
1124 for (const InputSocket *target_socket : targets) {
1125 const Node &target_node = target_socket->node();
1126 NodeState &node_state = *node_states_[target_node.index_in_graph()];
1127 const int input_index = target_socket->index();
1128 InputState &input_state = node_state.inputs[input_index];
1129 const bool is_last_target = target_socket == targets.last();
1130#ifndef NDEBUG
1131 if (input_state.value != nullptr) {
1132 if (self_.logger_ != nullptr) {
1133 self_.logger_->dump_when_input_is_set_twice(*target_socket, from_socket, local_context);
1134 }
1136 }
1137#endif
1138 BLI_assert(!input_state.was_ready_for_execution);
1139 BLI_assert(target_socket->type() == type);
1140 BLI_assert(target_socket->origin() == &from_socket);
1141
1142 if (self_.logger_ != nullptr) {
1143 self_.logger_->log_socket_value(*target_socket, value_to_forward, local_context);
1144 }
1145 if (target_node.is_interface()) {
1146 /* Forward the value to the outside of the graph. */
1147 const int graph_output_index =
1148 self_.graph_output_index_by_socket_index_[target_socket->index()];
1149 if (graph_output_index != -1 &&
1150 params_->get_output_usage(graph_output_index) != ValueUsage::Unused)
1151 {
1152 void *dst_buffer = params_->get_output_data_ptr(graph_output_index);
1153 if (is_last_target) {
1154 type.move_construct(value_to_forward.get(), dst_buffer);
1155 }
1156 else {
1157 type.copy_construct(value_to_forward.get(), dst_buffer);
1158 }
1159 params_->output_set(graph_output_index);
1160 }
1161 continue;
1162 }
1163 this->with_locked_node(
1164 target_node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1165 if (input_state.usage == ValueUsage::Unused) {
1166 return;
1167 }
1168 if (is_last_target) {
1169 /* No need to make a copy if this is the last target. */
1170 this->forward_value_to_input(
1171 locked_node, input_state, value_to_forward, current_task);
1172 value_to_forward = {};
1173 }
1174 else {
1175 void *buffer = local_data.allocator->allocate(type);
1176 type.copy_construct(value_to_forward.get(), buffer);
1177 this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
1178 }
1179 });
1180 }
1181 if (value_to_forward.get() != nullptr) {
1182 value_to_forward.destruct();
1183 }
1184 }
1185
1186 void forward_value_to_input(LockedNode &locked_node,
1187 InputState &input_state,
1188 GMutablePointer value,
1189 CurrentTask &current_task)
1190 {
1191 NodeState &node_state = locked_node.node_state;
1192
1193 BLI_assert(input_state.value == nullptr);
1194 BLI_assert(!input_state.was_ready_for_execution);
1195 input_state.value = value.get();
1196
1197 if (input_state.usage == ValueUsage::Used) {
1198 node_state.missing_required_inputs -= 1;
1199 if (node_state.missing_required_inputs == 0 ||
1200 (locked_node.node.is_function() && static_cast<const FunctionNode &>(locked_node.node)
1201 .function()
1202 .allow_missing_requested_inputs()))
1203 {
1204 this->schedule_node(locked_node, current_task, false);
1205 }
1206 }
1207 }
1208
1209 bool use_multi_threading() const
1210 {
1211 return task_pool_.load() != nullptr;
1212 }
1213
1214 bool try_enable_multi_threading()
1215 {
1216#ifndef WITH_TBB
1217 /* The non-TBB task pool has the property that it immediately executes tasks under some
1218 * circumstances. This is not supported here because tasks might be scheduled while another
1219 * node is in the middle of being executed on the same thread. */
1220 return false;
1221#endif
1222 if (this->use_multi_threading()) {
1223 return true;
1224 }
1225#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
1226 /* Only the current main thread is allowed to enabled multi-threading, because the executor is
1227 * still in single-threaded mode. */
1228 if (current_main_thread_ != std::this_thread::get_id()) {
1230 }
1231#endif
1232 /* Check of the caller supports multi-threading. */
1233 if (!params_->try_enable_multi_threading()) {
1234 return false;
1235 }
1236 /* Avoid using multiple threads when only one thread can be used anyway. */
1237 if (BLI_system_thread_count() <= 1) {
1238 return false;
1239 }
1240 this->ensure_thread_locals();
1241 task_pool_.store(BLI_task_pool_create(this, TASK_PRIORITY_HIGH));
1242 return true;
1243 }
1244
1245 void ensure_thread_locals()
1246 {
1247#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
1248 if (current_main_thread_ != std::this_thread::get_id()) {
1250 }
1251#endif
1252 if (!thread_locals_) {
1253 thread_locals_ = std::make_unique<threading::EnumerableThreadSpecific<ThreadLocalStorage>>();
1254 }
1255 }
1256
1260 void push_all_scheduled_nodes_to_task_pool(CurrentTask &current_task)
1261 {
1262 BLI_assert(this->use_multi_threading());
1263 std::unique_ptr<ScheduledNodes> scheduled_nodes = std::make_unique<ScheduledNodes>();
1264 {
1265 std::lock_guard lock{current_task.mutex};
1266 if (current_task.scheduled_nodes.is_empty()) {
1267 return;
1268 }
1269 *scheduled_nodes = std::move(current_task.scheduled_nodes);
1270 current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
1271 }
1272 this->push_to_task_pool(std::move(scheduled_nodes));
1273 }
1274
1275 void push_to_task_pool(std::unique_ptr<ScheduledNodes> scheduled_nodes)
1276 {
1277 /* All nodes are pushed as a single task in the pool. This avoids unnecessary threading
1278 * overhead when the nodes are fast to compute. */
1280 task_pool_.load(),
1281 [](TaskPool *pool, void *data) {
1282 Executor &executor = *static_cast<Executor *>(BLI_task_pool_user_data(pool));
1283 ScheduledNodes &scheduled_nodes = *static_cast<ScheduledNodes *>(data);
1284 CurrentTask new_current_task;
1285 new_current_task.scheduled_nodes = std::move(scheduled_nodes);
1286 new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
1287 const LocalData local_data = executor.get_local_data();
1288 executor.run_task(new_current_task, local_data);
1289 },
1290 scheduled_nodes.release(),
1291 true,
1292 [](TaskPool * /*pool*/, void *data) { delete static_cast<ScheduledNodes *>(data); });
1293 }
1294
1295 LocalData get_local_data()
1296 {
1297 if (!this->use_multi_threading()) {
1298 return {&main_allocator_, context_->local_user_data};
1299 }
1300 ThreadLocalStorage &local_storage = thread_locals_->local();
1301 if (!local_storage.local_user_data.has_value()) {
1302 local_storage.local_user_data = context_->user_data->get_local(local_storage.allocator);
1303 }
1304 return {&local_storage.allocator, local_storage.local_user_data->get()};
1305 }
1306};
1307
1309 private:
1310 Executor &executor_;
1311 const Node &node_;
1312 NodeState &node_state_;
1313 CurrentTask &current_task_;
1315 const Executor::LocalData &caller_local_data_;
1316
1317 public:
1319 Executor &executor,
1320 const Node &node,
1321 NodeState &node_state,
1322 CurrentTask &current_task,
1323 const Executor::LocalData &local_data)
1324 : Params(fn, node_state.enabled_multi_threading),
1325 executor_(executor),
1326 node_(node),
1327 node_state_(node_state),
1328 current_task_(current_task),
1329 caller_local_data_(local_data)
1330 {
1331 }
1332
1333 private:
1334 Executor::LocalData get_local_data()
1335 {
1336 if (!node_state_.enabled_multi_threading) {
1337 /* Can use the data from the thread-local data from the calling thread. */
1338 return caller_local_data_;
1339 }
1340 /* Need to retrieve the thread-local data for the current thread. */
1341 return executor_.get_local_data();
1342 }
1343
1344 void *try_get_input_data_ptr_impl(const int index) const override
1345 {
1346 const InputState &input_state = node_state_.inputs[index];
1347 if (input_state.was_ready_for_execution) {
1348 return input_state.value;
1349 }
1350 return nullptr;
1351 }
1352
1353 void *try_get_input_data_ptr_or_request_impl(const int index) override
1354 {
1355 const InputState &input_state = node_state_.inputs[index];
1356 if (input_state.was_ready_for_execution) {
1357 return input_state.value;
1358 }
1359 return executor_.set_input_required_during_execution(
1360 node_, node_state_, index, current_task_, this->get_local_data());
1361 }
1362
1363 void *get_output_data_ptr_impl(const int index) override
1364 {
1365 OutputState &output_state = node_state_.outputs[index];
1366 BLI_assert(!output_state.has_been_computed);
1367 if (output_state.value == nullptr) {
1368 LinearAllocator<> &allocator = *this->get_local_data().allocator;
1369 const CPPType &type = node_.output(index).type();
1370 output_state.value = allocator.allocate(type);
1371 }
1372 return output_state.value;
1373 }
1374
1375 void output_set_impl(const int index) override
1376 {
1377 OutputState &output_state = node_state_.outputs[index];
1378 BLI_assert(!output_state.has_been_computed);
1379 BLI_assert(output_state.value != nullptr);
1380 const OutputSocket &output_socket = node_.output(index);
1381 executor_.forward_value_to_linked_inputs(output_socket,
1382 {output_socket.type(), output_state.value},
1383 current_task_,
1384 this->get_local_data());
1385 output_state.value = nullptr;
1386 output_state.has_been_computed = true;
1387 }
1388
1389 bool output_was_set_impl(const int index) const override
1390 {
1391 const OutputState &output_state = node_state_.outputs[index];
1392 return output_state.has_been_computed;
1393 }
1394
1395 ValueUsage get_output_usage_impl(const int index) const override
1396 {
1397 const OutputState &output_state = node_state_.outputs[index];
1398 return output_state.usage_for_execution;
1399 }
1400
1401 void set_input_unused_impl(const int index) override
1402 {
1403 executor_.set_input_unused_during_execution(
1404 node_, node_state_, index, current_task_, this->get_local_data());
1405 }
1406
1407 bool try_enable_multi_threading_impl() override
1408 {
1409 const bool success = executor_.try_enable_multi_threading();
1410 if (success) {
1411 node_state_.enabled_multi_threading = true;
1412 }
1413 return success;
1414 }
1415};
1416
1422inline void Executor::execute_node(const FunctionNode &node,
1423 NodeState &node_state,
1424 CurrentTask &current_task,
1425 const LocalData &local_data)
1426{
1427 const LazyFunction &fn = node.function();
1428 GraphExecutorLFParams node_params{fn, *this, node, node_state, current_task, local_data};
1429
1430 Context fn_context(node_state.storage, context_->user_data, local_data.local_user_data);
1431
1432 if (self_.logger_ != nullptr) {
1433 self_.logger_->log_before_node_execute(node, node_params, fn_context);
1434 }
1435
1436 /* This is run when the execution of the node calls `lazy_threading::send_hint` to indicate that
1437 * the execution will take a while. In this case, other tasks waiting on this thread should be
1438 * allowed to be picked up by another thread. */
1439 auto blocking_hint_fn = [&]() {
1440 if (!current_task.has_scheduled_nodes.load()) {
1441 return;
1442 }
1443 if (!this->try_enable_multi_threading()) {
1444 return;
1445 }
1446 this->push_all_scheduled_nodes_to_task_pool(current_task);
1447 };
1448
1449 lazy_threading::HintReceiver blocking_hint_receiver{blocking_hint_fn};
1450 if (self_.node_execute_wrapper_) {
1451 self_.node_execute_wrapper_->execute_node(node, node_params, fn_context);
1452 }
1453 else {
1454 fn.execute(node_params, fn_context);
1455 }
1456
1457 if (self_.logger_ != nullptr) {
1458 self_.logger_->log_after_node_execute(node, node_params, fn_context);
1459 }
1460}
1461
1463 const Logger *logger,
1464 const SideEffectProvider *side_effect_provider,
1465 const NodeExecuteWrapper *node_execute_wrapper)
1466 : GraphExecutor(graph,
1467 Vector<const GraphInputSocket *>(graph.graph_inputs()),
1468 Vector<const GraphOutputSocket *>(graph.graph_outputs()),
1469 logger,
1470 side_effect_provider,
1471 node_execute_wrapper)
1472{
1473}
1474
1478 const Logger *logger,
1479 const SideEffectProvider *side_effect_provider,
1480 const NodeExecuteWrapper *node_execute_wrapper)
1481 : graph_(graph),
1482 graph_inputs_(std::move(graph_inputs)),
1483 graph_outputs_(std::move(graph_outputs)),
1484 graph_input_index_by_socket_index_(graph.graph_inputs().size(), -1),
1485 graph_output_index_by_socket_index_(graph.graph_outputs().size(), -1),
1486 logger_(logger),
1487 side_effect_provider_(side_effect_provider),
1488 node_execute_wrapper_(node_execute_wrapper)
1489{
1490 debug_name_ = graph.name().c_str();
1491
1492 /* The graph executor can handle partial execution when there are still missing inputs. */
1494
1495 for (const int i : graph_inputs_.index_range()) {
1496 const OutputSocket &socket = *graph_inputs_[i];
1497 BLI_assert(socket.node().is_interface());
1498 inputs_.append({"In", socket.type(), ValueUsage::Maybe});
1499 graph_input_index_by_socket_index_[socket.index()] = i;
1500 }
1501 for (const int i : graph_outputs_.index_range()) {
1502 const InputSocket &socket = *graph_outputs_[i];
1503 BLI_assert(socket.node().is_interface());
1504 outputs_.append({"Out", socket.type()});
1505 graph_output_index_by_socket_index_[socket.index()] = i;
1506 }
1507
1508 /* Preprocess buffer offsets. */
1509 int offset = 0;
1510 const Span<const Node *> nodes = graph_.nodes();
1511 init_buffer_info_.node_states_array_offset = offset;
1512 offset += sizeof(NodeState *) * nodes.size();
1513 init_buffer_info_.loaded_inputs_array_offset = offset;
1514 offset += sizeof(std::atomic<uint8_t>) * graph_inputs_.size();
1515 /* Align offset. */
1516 offset = (offset + sizeof(void *) - 1) & ~(sizeof(void *) - 1);
1517
1518 init_buffer_info_.node_states_offsets.reinitialize(graph_.nodes().size());
1519 for (const int i : nodes.index_range()) {
1520 const Node &node = *nodes[i];
1521 init_buffer_info_.node_states_offsets[i] = offset;
1522 offset += sizeof(NodeState);
1523 offset += sizeof(InputState) * node.inputs().size();
1524 offset += sizeof(OutputState) * node.outputs().size();
1525 /* Make sure we don't have to worry about alignment. */
1526 static_assert(sizeof(NodeState) % sizeof(void *) == 0);
1527 static_assert(sizeof(InputState) % sizeof(void *) == 0);
1528 static_assert(sizeof(OutputState) % sizeof(void *) == 0);
1529 }
1530
1531 init_buffer_info_.total_size = offset;
1532}
1533
1534void GraphExecutor::execute_impl(Params &params, const Context &context) const
1535{
1536 Executor &executor = *static_cast<Executor *>(context.storage);
1537 executor.execute(params, context);
1538}
1539
1541{
1542 Executor &executor = *allocator.construct<Executor>(*this).release();
1543 return &executor;
1544}
1545
1546void GraphExecutor::destruct_storage(void *storage) const
1547{
1548 std::destroy_at(static_cast<Executor *>(storage));
1549}
1550
1551std::string GraphExecutor::input_name(const int index) const
1552{
1553 const lf::OutputSocket &socket = *graph_inputs_[index];
1554 return socket.name();
1555}
1556
1557std::string GraphExecutor::output_name(const int index) const
1558{
1559 const lf::InputSocket &socket = *graph_outputs_[index];
1560 return socket.name();
1561}
1562
1564 const GPointer value,
1565 const Context &context) const
1566{
1567 UNUSED_VARS(socket, value, context);
1568}
1569
1571 const Params &params,
1572 const Context &context) const
1573{
1574 UNUSED_VARS(node, params, context);
1575}
1576
1578 const Params &params,
1579 const Context &context) const
1580{
1581 UNUSED_VARS(node, params, context);
1582}
1583
1585 const Context &context) const
1586{
1587 UNUSED_VARS(context);
1588 return {};
1589}
1590
1592 Span<const OutputSocket *> missing_sockets,
1593 const Context &context) const
1594{
1595 UNUSED_VARS(node, missing_sockets, context);
1596}
1597
1599 const OutputSocket &from_socket,
1600 const Context &context) const
1601{
1602 UNUSED_VARS(target_socket, from_socket, context);
1603}
1604
1605} // namespace blender::fn::lazy_function
#define BLI_assert_unreachable()
Definition BLI_assert.h:93
#define BLI_assert(a)
Definition BLI_assert.h:46
#define final(a, b, c)
Definition BLI_hash.h:19
#define BLI_SCOPED_DEFER(function_to_defer)
@ TASK_PRIORITY_HIGH
Definition BLI_task.h:53
void BLI_task_pool_work_and_wait(TaskPool *pool)
Definition task_pool.cc:535
TaskPool * BLI_task_pool_create(void *userdata, eTaskPriority priority)
Definition task_pool.cc:484
void BLI_task_pool_free(TaskPool *pool)
Definition task_pool.cc:521
void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskFreeFunction freedata)
Definition task_pool.cc:526
int BLI_system_thread_count(void)
Definition threads.cc:253
#define UNUSED_VARS(...)
volatile int lock
BMesh const char void * data
PyObject * self
long long int int64_t
static DBVT_INLINE btScalar size(const btDbvtVolume &a)
Definition btDbvt.cpp:52
void copy_construct(const void *src, void *dst) const
void destruct(void *ptr) const
const void * default_value() const
void move_construct(void *src, void *dst) const
constexpr int64_t size() const
Definition BLI_span.hh:252
constexpr IndexRange index_range() const
Definition BLI_span.hh:401
bool is_empty() const
Definition BLI_stack.hh:308
T pop()
Definition BLI_stack.hh:242
void push(const T &value)
Definition BLI_stack.hh:213
void append(const T &value)
bool is_empty() const
destruct_ptr< T > construct(Args &&...args)
void * allocate(const int64_t size, const int64_t alignment)
void append(const T &value)
bool is_empty() const
void execute(Params &params, const Context &context)
GraphExecutorLFParams(const LazyFunction &fn, Executor &executor, const Node &node, NodeState &node_state, CurrentTask &current_task, const Executor::LocalData &local_data)
virtual void log_before_node_execute(const FunctionNode &node, const Params &params, const Context &context) const
virtual void dump_when_outputs_are_missing(const FunctionNode &node, Span< const OutputSocket * > missing_sockets, const Context &context) const
virtual void log_after_node_execute(const FunctionNode &node, const Params &params, const Context &context) const
virtual void log_socket_value(const Socket &socket, GPointer value, const Context &context) const
virtual void dump_when_input_is_set_twice(const InputSocket &target_socket, const OutputSocket &from_socket, const Context &context) const
virtual Vector< const FunctionNode * > get_nodes_with_side_effects(const Context &context) const
void * init_storage(LinearAllocator<> &allocator) const override
std::string output_name(int index) const override
GraphExecutor(const Graph &graph, const Logger *logger, const SideEffectProvider *side_effect_provider, const NodeExecuteWrapper *node_execute_wrapper)
const OutputSocket & output(int index) const
Params(const LazyFunction &fn, bool allow_multi_threading_initially)
TaskPool * task_pool
uiWidgetBaseParameters params[MAX_WIDGET_BASE_BATCH]
static Local & get_local_data()
int context(const bContext *C, const char *member, bContextDataResult *result)
void parallel_for(const IndexRange range, const int64_t grain_size, const Function &function, const TaskSizeHints &size_hints=detail::TaskSizeHints_Static(1))
Definition BLI_task.hh:93
std::mutex Mutex
Definition BLI_mutex.hh:47
Vector< const OutputSocket * > delayed_required_outputs
LockedNode(const Node &node, NodeState &node_state)
void schedule(const FunctionNode &node, const bool is_priority)
i
Definition text_draw.cc:230