Blender V4.3
lazy_function_graph_executor.cc
Go to the documentation of this file.
1/* SPDX-FileCopyrightText: 2023 Blender Authors
2 *
3 * SPDX-License-Identifier: GPL-2.0-or-later */
4
44#include <mutex>
45#include <sstream>
46
49#include "BLI_function_ref.hh"
50#include "BLI_task.h"
51#include "BLI_task.hh"
52#include "BLI_timeit.hh"
53
55
57
80
81struct InputState {
89 void *value = nullptr;
101};
102
132
188
213
214class Executor;
215class GraphExecutorLFParams;
216
222 private:
226
227 public:
228 void schedule(const FunctionNode &node, const bool is_priority)
229 {
230 if (is_priority) {
231 this->priority_.append(&node);
232 }
233 else {
234 this->normal_.append(&node);
235 }
236 }
237
239 {
240 if (!this->priority_.is_empty()) {
241 return this->priority_.pop_last();
242 }
243 if (!this->normal_.is_empty()) {
244 return this->normal_.pop_last();
245 }
246 return nullptr;
247 }
248
249 bool is_empty() const
250 {
251 return this->priority_.is_empty() && this->normal_.is_empty();
252 }
253
255 {
256 return priority_.size() + normal_.size();
257 }
258
263 {
264 BLI_assert(this != &other);
265 const int64_t priority_split = priority_.size() / 2;
266 const int64_t normal_split = normal_.size() / 2;
267 other.priority_.extend(priority_.as_span().drop_front(priority_split));
268 other.normal_.extend(normal_.as_span().drop_front(normal_split));
269 priority_.resize(priority_split);
270 normal_.resize(normal_split);
271 }
272};
273
278 std::mutex mutex;
287 std::atomic<bool> has_scheduled_nodes = false;
288};
289
290class Executor {
291 private:
292 const GraphExecutor &self_;
297 MutableSpan<std::atomic<uint8_t>> loaded_inputs_;
301 MutableSpan<NodeState *> node_states_;
305 Params *params_ = nullptr;
306 const Context *context_ = nullptr;
311 std::atomic<TaskPool *> task_pool_ = nullptr;
312#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
313 std::thread::id current_main_thread_;
314#endif
315
316 struct ThreadLocalStorage {
321 LinearAllocator<> allocator;
322 std::optional<destruct_ptr<LocalUserData>> local_user_data;
323 };
324 std::unique_ptr<threading::EnumerableThreadSpecific<ThreadLocalStorage>> thread_locals_;
325 LinearAllocator<> main_allocator_;
329 bool is_first_execution_ = true;
330
332
339 struct LocalData {
340 LinearAllocator<> *allocator;
341 LocalUserData *local_user_data;
342 };
343
344 public:
346 {
347 /* The indices are necessary, because they are used as keys in #node_states_. */
348 BLI_assert(self_.graph_.node_indices_are_valid());
349 }
350
352 {
353 if (TaskPool *task_pool = task_pool_.load()) {
355 }
356 threading::parallel_for(node_states_.index_range(), 1024, [&](const IndexRange range) {
357 for (const int node_index : range) {
358 const Node &node = *self_.graph_.nodes()[node_index];
359 NodeState &node_state = *node_states_[node_index];
360 this->destruct_node_state(node, node_state);
361 }
362 });
363 }
364
368 void execute(Params &params, const Context &context)
369 {
370 params_ = &params;
371 context_ = &context;
372#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
373 current_main_thread_ = std::this_thread::get_id();
374#endif
375 const auto deferred_func = [&]() {
376 /* Make sure the pointers are not dangling, even when it shouldn't be accessed by anyone. */
377 params_ = nullptr;
378 context_ = nullptr;
379 is_first_execution_ = false;
380#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
381 current_main_thread_ = {};
382#endif
383 };
384 BLI_SCOPED_DEFER(deferred_func);
385
386 const LocalData local_data = this->get_local_data();
387
388 CurrentTask current_task;
389 if (is_first_execution_) {
390 /* Allocate a single large buffer instead of making many smaller allocations below. */
391 char *buffer = static_cast<char *>(
392 local_data.allocator->allocate(self_.init_buffer_info_.total_size, alignof(void *)));
393 this->initialize_node_states(buffer);
394
395 loaded_inputs_ = MutableSpan{
396 reinterpret_cast<std::atomic<uint8_t> *>(
397 buffer + self_.init_buffer_info_.loaded_inputs_array_offset),
398 self_.graph_inputs_.size()};
399 /* Initialize atomics to zero. */
400 memset(static_cast<void *>(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool));
401
402 this->set_always_unused_graph_inputs();
403 this->set_defaulted_graph_outputs(local_data);
404
405 /* Retrieve and tag side effect nodes. */
406 Vector<const FunctionNode *> side_effect_nodes;
407 if (self_.side_effect_provider_ != nullptr) {
408 side_effect_nodes = self_.side_effect_provider_->get_nodes_with_side_effects(context);
409 for (const FunctionNode *node : side_effect_nodes) {
410 BLI_assert(self_.graph_.nodes().contains(node));
411 const int node_index = node->index_in_graph();
412 NodeState &node_state = *node_states_[node_index];
413 node_state.has_side_effects = true;
414 }
415 }
416
417 this->initialize_static_value_usages(side_effect_nodes);
418 this->schedule_side_effect_nodes(side_effect_nodes, current_task, local_data);
419 }
420
421 this->schedule_for_new_output_usages(current_task, local_data);
422 this->forward_newly_provided_inputs(current_task, local_data);
423
424 this->run_task(current_task, local_data);
425
426 if (TaskPool *task_pool = task_pool_.load()) {
428 }
429 }
430
431 private:
432 void initialize_node_states(char *buffer)
433 {
434 Span<const Node *> nodes = self_.graph_.nodes();
435 node_states_ = MutableSpan{
436 reinterpret_cast<NodeState **>(buffer + self_.init_buffer_info_.node_states_array_offset),
437 nodes.size()};
438
439 threading::parallel_for(nodes.index_range(), 256, [&](const IndexRange range) {
440 for (const int i : range) {
441 const Node &node = *nodes[i];
442 char *memory = buffer + self_.init_buffer_info_.node_states_offsets[i];
443
444 /* Initialize node state. */
445 NodeState *node_state = reinterpret_cast<NodeState *>(memory);
446 memory += sizeof(NodeState);
447 new (node_state) NodeState();
448
449 /* Initialize socket states. */
450 const int num_inputs = node.inputs().size();
451 const int num_outputs = node.outputs().size();
452 node_state->inputs = reinterpret_cast<InputState *>(memory);
453 memory += sizeof(InputState) * num_inputs;
454 node_state->outputs = reinterpret_cast<OutputState *>(memory);
455
456 default_construct_n(node_state->inputs, num_inputs);
457 default_construct_n(node_state->outputs, num_outputs);
458
459 node_states_[i] = node_state;
460 }
461 });
462 }
463
464 void destruct_node_state(const Node &node, NodeState &node_state)
465 {
466 if (node.is_function()) {
467 const LazyFunction &fn = static_cast<const FunctionNode &>(node).function();
468 if (node_state.storage != nullptr) {
469 fn.destruct_storage(node_state.storage);
470 }
471 }
472 for (const int i : node.inputs().index_range()) {
473 InputState &input_state = node_state.inputs[i];
474 const InputSocket &input_socket = node.input(i);
475 this->destruct_input_value_if_exists(input_state, input_socket.type());
476 }
477 std::destroy_at(&node_state);
478 }
479
483 void schedule_for_new_output_usages(CurrentTask &current_task, const LocalData &local_data)
484 {
485 for (const int graph_output_index : self_.graph_outputs_.index_range()) {
486 if (params_->output_was_set(graph_output_index)) {
487 continue;
488 }
489 const ValueUsage output_usage = params_->get_output_usage(graph_output_index);
490 if (output_usage == ValueUsage::Maybe) {
491 continue;
492 }
493 const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
494 const Node &node = socket.node();
495 NodeState &node_state = *node_states_[node.index_in_graph()];
496 this->with_locked_node(
497 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
498 if (output_usage == ValueUsage::Used) {
499 this->set_input_required(locked_node, socket);
500 }
501 else {
502 this->set_input_unused(locked_node, socket);
503 }
504 });
505 }
506 }
507
508 void set_defaulted_graph_outputs(const LocalData &local_data)
509 {
510 for (const int graph_output_index : self_.graph_outputs_.index_range()) {
511 const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
512 if (socket.origin() != nullptr) {
513 continue;
514 }
515 const CPPType &type = socket.type();
516 const void *default_value = socket.default_value();
517 BLI_assert(default_value != nullptr);
518
519 if (self_.logger_ != nullptr) {
520 const Context context{context_->storage, context_->user_data, local_data.local_user_data};
521 self_.logger_->log_socket_value(socket, {type, default_value}, context);
522 }
523
524 void *output_ptr = params_->get_output_data_ptr(graph_output_index);
525 type.copy_construct(default_value, output_ptr);
526 params_->output_set(graph_output_index);
527 }
528 }
529
530 void set_always_unused_graph_inputs()
531 {
532 for (const int i : self_.graph_inputs_.index_range()) {
533 const OutputSocket &socket = *self_.graph_inputs_[i];
534 const Node &node = socket.node();
535 const NodeState &node_state = *node_states_[node.index_in_graph()];
536 const OutputState &output_state = node_state.outputs[socket.index()];
537 if (output_state.usage == ValueUsage::Unused) {
538 params_->set_input_unused(i);
539 }
540 }
541 }
542
550 void initialize_static_value_usages(const Span<const FunctionNode *> side_effect_nodes)
551 {
552 const Span<const Node *> all_nodes = self_.graph_.nodes();
553
554 /* Used for a search through all nodes that outputs depend on. */
555 Stack<const Node *> reachable_nodes_to_check;
556 Array<bool> reachable_node_flags(all_nodes.size(), false);
557
558 /* Graph outputs are always reachable. */
559 for (const InputSocket *socket : self_.graph_outputs_) {
560 const Node &node = socket->node();
561 const int node_index = node.index_in_graph();
562 if (!reachable_node_flags[node_index]) {
563 reachable_node_flags[node_index] = true;
564 reachable_nodes_to_check.push(&node);
565 }
566 }
567
568 /* Side effect nodes are always reachable. */
569 for (const FunctionNode *node : side_effect_nodes) {
570 const int node_index = node->index_in_graph();
571 reachable_node_flags[node_index] = true;
572 reachable_nodes_to_check.push(node);
573 }
574
575 /* Tag every node that reachable nodes depend on using depth-first-search. */
576 while (!reachable_nodes_to_check.is_empty()) {
577 const Node &node = *reachable_nodes_to_check.pop();
578 for (const InputSocket *input_socket : node.inputs()) {
579 const OutputSocket *origin_socket = input_socket->origin();
580 if (origin_socket != nullptr) {
581 const Node &origin_node = origin_socket->node();
582 const int origin_node_index = origin_node.index_in_graph();
583 if (!reachable_node_flags[origin_node_index]) {
584 reachable_node_flags[origin_node_index] = true;
585 reachable_nodes_to_check.push(&origin_node);
586 }
587 }
588 }
589 }
590
591 for (const int node_index : reachable_node_flags.index_range()) {
592 const Node &node = *all_nodes[node_index];
593 NodeState &node_state = *node_states_[node_index];
594 const bool node_is_reachable = reachable_node_flags[node_index];
595 if (node_is_reachable) {
596 for (const int output_index : node.outputs().index_range()) {
597 const OutputSocket &output_socket = node.output(output_index);
598 OutputState &output_state = node_state.outputs[output_index];
599 int use_count = 0;
600 for (const InputSocket *target_socket : output_socket.targets()) {
601 const Node &target_node = target_socket->node();
602 const bool target_is_reachable = reachable_node_flags[target_node.index_in_graph()];
603 /* Only count targets that are reachable. */
604 if (target_is_reachable) {
605 use_count++;
606 }
607 }
608 output_state.potential_target_sockets = use_count;
609 if (use_count == 0) {
610 output_state.usage = ValueUsage::Unused;
611 }
612 }
613 }
614 else {
615 /* Inputs of unreachable nodes are unused. */
616 for (const int input_index : node.inputs().index_range()) {
617 node_state.inputs[input_index].usage = ValueUsage::Unused;
618 }
619 }
620 }
621 }
622
623 void schedule_side_effect_nodes(const Span<const FunctionNode *> side_effect_nodes,
624 CurrentTask &current_task,
625 const LocalData &local_data)
626 {
627 for (const FunctionNode *node : side_effect_nodes) {
628 NodeState &node_state = *node_states_[node->index_in_graph()];
629 this->with_locked_node(
630 *node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
631 this->schedule_node(locked_node, current_task, false);
632 });
633 }
634 }
635
636 void forward_newly_provided_inputs(CurrentTask &current_task, const LocalData &local_data)
637 {
638 for (const int graph_input_index : self_.graph_inputs_.index_range()) {
639 std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
640 if (was_loaded.load()) {
641 continue;
642 }
643 void *input_data = params_->try_get_input_data_ptr(graph_input_index);
644 if (input_data == nullptr) {
645 continue;
646 }
647 if (was_loaded.fetch_or(1)) {
648 /* The value was forwarded before. */
649 continue;
650 }
651 this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
652 }
653 }
654
655 void forward_newly_provided_input(CurrentTask &current_task,
656 const LocalData &local_data,
657 const int graph_input_index,
658 void *input_data)
659 {
660 const OutputSocket &socket = *self_.graph_inputs_[graph_input_index];
661 const CPPType &type = socket.type();
662 void *buffer = local_data.allocator->allocate(type.size(), type.alignment());
663 type.move_construct(input_data, buffer);
664 this->forward_value_to_linked_inputs(socket, {type, buffer}, current_task, local_data);
665 }
666
667 void notify_output_required(const OutputSocket &socket,
668 CurrentTask &current_task,
669 const LocalData &local_data)
670 {
671 const Node &node = socket.node();
672 const int index_in_node = socket.index();
673 NodeState &node_state = *node_states_[node.index_in_graph()];
674 OutputState &output_state = node_state.outputs[index_in_node];
675
676 /* The notified output socket might be an input of the entire graph. In this case, notify the
677 * caller that the input is required. */
678 if (node.is_interface()) {
679 const int graph_input_index = self_.graph_input_index_by_socket_index_[socket.index()];
680 std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
681 if (was_loaded.load()) {
682 return;
683 }
684 void *input_data = params_->try_get_input_data_ptr_or_request(graph_input_index);
685 if (input_data == nullptr) {
686 return;
687 }
688 if (was_loaded.fetch_or(1)) {
689 /* The value was forwarded already. */
690 return;
691 }
692 this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
693 return;
694 }
695
696 BLI_assert(node.is_function());
697 this->with_locked_node(
698 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
699 if (output_state.usage == ValueUsage::Used) {
700 return;
701 }
702 output_state.usage = ValueUsage::Used;
703 this->schedule_node(locked_node, current_task, false);
704 });
705 }
706
707 void notify_output_unused(const OutputSocket &socket,
708 CurrentTask &current_task,
709 const LocalData &local_data)
710 {
711 const Node &node = socket.node();
712 const int index_in_node = socket.index();
713 NodeState &node_state = *node_states_[node.index_in_graph()];
714 OutputState &output_state = node_state.outputs[index_in_node];
715
716 this->with_locked_node(
717 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
718 output_state.potential_target_sockets -= 1;
719 if (output_state.potential_target_sockets == 0) {
720 BLI_assert(output_state.usage != ValueUsage::Unused);
721 if (output_state.usage == ValueUsage::Maybe) {
722 output_state.usage = ValueUsage::Unused;
723 if (node.is_interface()) {
724 const int graph_input_index =
725 self_.graph_input_index_by_socket_index_[socket.index()];
726 params_->set_input_unused(graph_input_index);
727 }
728 else {
729 /* Schedule as priority node. This allows freeing up memory earlier which results
730 * in better memory reuse and fewer implicit sharing copies. */
731 this->schedule_node(locked_node, current_task, true);
732 }
733 }
734 }
735 });
736 }
737
738 void schedule_node(LockedNode &locked_node, CurrentTask &current_task, const bool is_priority)
739 {
740 BLI_assert(locked_node.node.is_function());
741 switch (locked_node.node_state.schedule_state) {
742 case NodeScheduleState::NotScheduled: {
743 locked_node.node_state.schedule_state = NodeScheduleState::Scheduled;
744 const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node);
745 if (this->use_multi_threading()) {
746 std::lock_guard lock{current_task.mutex};
747 current_task.scheduled_nodes.schedule(node, is_priority);
748 }
749 else {
750 current_task.scheduled_nodes.schedule(node, is_priority);
751 }
752 current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
753 break;
754 }
755 case NodeScheduleState::Scheduled: {
756 break;
757 }
758 case NodeScheduleState::Running: {
759 locked_node.node_state.schedule_state = NodeScheduleState::RunningAndRescheduled;
760 break;
761 }
762 case NodeScheduleState::RunningAndRescheduled: {
763 break;
764 }
765 }
766 }
767
768 void with_locked_node(const Node &node,
769 NodeState &node_state,
770 CurrentTask &current_task,
771 const LocalData &local_data,
772 const FunctionRef<void(LockedNode &)> f)
773 {
774 BLI_assert(&node_state == node_states_[node.index_in_graph()]);
775
776 LockedNode locked_node{node, node_state};
777 if (this->use_multi_threading()) {
778 std::lock_guard lock{node_state.mutex};
779 threading::isolate_task([&]() { f(locked_node); });
780 }
781 else {
782 f(locked_node);
783 }
784
785 this->send_output_required_notifications(
786 locked_node.delayed_required_outputs, current_task, local_data);
787 this->send_output_unused_notifications(
788 locked_node.delayed_unused_outputs, current_task, local_data);
789 }
790
791 void send_output_required_notifications(const Span<const OutputSocket *> sockets,
792 CurrentTask &current_task,
793 const LocalData &local_data)
794 {
795 for (const OutputSocket *socket : sockets) {
796 this->notify_output_required(*socket, current_task, local_data);
797 }
798 }
799
800 void send_output_unused_notifications(const Span<const OutputSocket *> sockets,
801 CurrentTask &current_task,
802 const LocalData &local_data)
803 {
804 for (const OutputSocket *socket : sockets) {
805 this->notify_output_unused(*socket, current_task, local_data);
806 }
807 }
808
809 void run_task(CurrentTask &current_task, const LocalData &local_data)
810 {
811 while (const FunctionNode *node = current_task.scheduled_nodes.pop_next_node()) {
812 if (current_task.scheduled_nodes.is_empty()) {
813 current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
814 }
815 this->run_node_task(*node, current_task, local_data);
816
817 /* If there are many nodes scheduled at the same time, it's beneficial to let multiple
818 * threads work on those. */
819 if (current_task.scheduled_nodes.nodes_num() > 128) {
820 if (this->try_enable_multi_threading()) {
821 std::unique_ptr<ScheduledNodes> split_nodes = std::make_unique<ScheduledNodes>();
822 current_task.scheduled_nodes.split_into(*split_nodes);
823 this->push_to_task_pool(std::move(split_nodes));
824 }
825 }
826 }
827 }
828
829 void run_node_task(const FunctionNode &node,
830 CurrentTask &current_task,
831 const LocalData &local_data)
832 {
833 NodeState &node_state = *node_states_[node.index_in_graph()];
834 LinearAllocator<> &allocator = *local_data.allocator;
835 Context local_context{context_->storage, context_->user_data, local_data.local_user_data};
836 const LazyFunction &fn = node.function();
837
838 bool node_needs_execution = false;
839 this->with_locked_node(
840 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
841 BLI_assert(node_state.schedule_state == NodeScheduleState::Scheduled);
842 node_state.schedule_state = NodeScheduleState::Running;
843
844 if (node_state.node_has_finished) {
845 return;
846 }
847
848 bool required_uncomputed_output_exists = false;
849 for (const int output_index : node.outputs().index_range()) {
850 OutputState &output_state = node_state.outputs[output_index];
851 output_state.usage_for_execution = output_state.usage;
852 if (output_state.usage == ValueUsage::Used && !output_state.has_been_computed) {
853 required_uncomputed_output_exists = true;
854 }
855 }
856 if (!required_uncomputed_output_exists && !node_state.has_side_effects) {
857 return;
858 }
859
860 if (!node_state.always_used_inputs_requested) {
861 /* Request linked inputs that are always needed. */
862 const Span<Input> fn_inputs = fn.inputs();
863 for (const int input_index : fn_inputs.index_range()) {
864 const Input &fn_input = fn_inputs[input_index];
865 if (fn_input.usage == ValueUsage::Used) {
866 const InputSocket &input_socket = node.input(input_index);
867 if (input_socket.origin() != nullptr) {
868 this->set_input_required(locked_node, input_socket);
869 }
870 }
871 }
872
873 node_state.always_used_inputs_requested = true;
874 }
875
876 for (const int input_index : node.inputs().index_range()) {
877 InputState &input_state = node_state.inputs[input_index];
878 if (input_state.was_ready_for_execution) {
879 continue;
880 }
881 if (input_state.value != nullptr) {
882 input_state.was_ready_for_execution = true;
883 continue;
884 }
885 if (!fn.allow_missing_requested_inputs()) {
886 if (input_state.usage == ValueUsage::Used) {
887 return;
888 }
889 }
890 }
891
892 node_needs_execution = true;
893 });
894
895 if (node_needs_execution) {
896 if (!node_state.storage_and_defaults_initialized) {
897 /* Initialize storage. */
898 node_state.storage = fn.init_storage(allocator);
899
900 /* Load unlinked inputs. */
901 for (const int input_index : node.inputs().index_range()) {
902 const InputSocket &input_socket = node.input(input_index);
903 if (input_socket.origin() != nullptr) {
904 continue;
905 }
906 InputState &input_state = node_state.inputs[input_index];
907 const CPPType &type = input_socket.type();
908 const void *default_value = input_socket.default_value();
909 BLI_assert(default_value != nullptr);
910 if (self_.logger_ != nullptr) {
911 self_.logger_->log_socket_value(input_socket, {type, default_value}, local_context);
912 }
913 BLI_assert(input_state.value == nullptr);
914 input_state.value = allocator.allocate(type.size(), type.alignment());
915 type.copy_construct(default_value, input_state.value);
916 input_state.was_ready_for_execution = true;
917 }
918
919 node_state.storage_and_defaults_initialized = true;
920 }
921
922 /* Importantly, the node must not be locked when it is executed. That would result in locks
923 * being hold very long in some cases and results in multiple locks being hold by the same
924 * thread in the same graph which can lead to deadlocks. */
925 this->execute_node(node, node_state, current_task, local_data);
926 }
927
928 this->with_locked_node(
929 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
930#ifndef NDEBUG
931 if (node_needs_execution) {
932 this->assert_expected_outputs_have_been_computed(locked_node, local_data);
933 }
934#endif
935 this->finish_node_if_possible(locked_node);
936 const bool reschedule_requested = node_state.schedule_state ==
937 NodeScheduleState::RunningAndRescheduled;
938 node_state.schedule_state = NodeScheduleState::NotScheduled;
939 if (reschedule_requested && !node_state.node_has_finished) {
940 this->schedule_node(locked_node, current_task, false);
941 }
942 });
943 }
944
945 void assert_expected_outputs_have_been_computed(LockedNode &locked_node,
946 const LocalData &local_data)
947 {
948 const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node);
949 const NodeState &node_state = locked_node.node_state;
950
951 if (node_state.missing_required_inputs > 0) {
952 return;
953 }
954 if (node_state.schedule_state == NodeScheduleState::RunningAndRescheduled) {
955 return;
956 }
957 Vector<const OutputSocket *> missing_outputs;
958 for (const int i : node.outputs().index_range()) {
959 const OutputState &output_state = node_state.outputs[i];
960 if (output_state.usage_for_execution == ValueUsage::Used) {
961 if (!output_state.has_been_computed) {
962 missing_outputs.append(&node.output(i));
963 }
964 }
965 }
966 if (!missing_outputs.is_empty()) {
967 if (self_.logger_ != nullptr) {
968 const Context context{context_->storage, context_->user_data, local_data.local_user_data};
969 self_.logger_->dump_when_outputs_are_missing(node, missing_outputs, context);
970 }
972 }
973 }
974
975 void finish_node_if_possible(LockedNode &locked_node)
976 {
977 const Node &node = locked_node.node;
978 NodeState &node_state = locked_node.node_state;
979
980 if (node_state.node_has_finished) {
981 /* Was finished already. */
982 return;
983 }
984 /* If there are outputs that may still be used, the node is not done yet. */
985 for (const int output_index : node.outputs().index_range()) {
986 const OutputState &output_state = node_state.outputs[output_index];
987 if (output_state.usage != ValueUsage::Unused && !output_state.has_been_computed) {
988 return;
989 }
990 }
991 /* If the node is still waiting for inputs, it is not done yet. */
992 for (const int input_index : node.inputs().index_range()) {
993 const InputState &input_state = node_state.inputs[input_index];
994 if (input_state.usage == ValueUsage::Used && !input_state.was_ready_for_execution) {
995 return;
996 }
997 }
998
999 node_state.node_has_finished = true;
1000
1001 for (const int input_index : node.inputs().index_range()) {
1002 const InputSocket &input_socket = node.input(input_index);
1003 InputState &input_state = node_state.inputs[input_index];
1004 if (input_state.usage == ValueUsage::Maybe) {
1005 this->set_input_unused(locked_node, input_socket);
1006 }
1007 else if (input_state.usage == ValueUsage::Used) {
1008 this->destruct_input_value_if_exists(input_state, input_socket.type());
1009 }
1010 }
1011
1012 if (node_state.storage != nullptr) {
1013 if (node.is_function()) {
1014 const FunctionNode &fn_node = static_cast<const FunctionNode &>(node);
1015 fn_node.function().destruct_storage(node_state.storage);
1016 }
1017 node_state.storage = nullptr;
1018 }
1019 }
1020
1021 void destruct_input_value_if_exists(InputState &input_state, const CPPType &type)
1022 {
1023 if (input_state.value != nullptr) {
1024 type.destruct(input_state.value);
1025 input_state.value = nullptr;
1026 }
1027 }
1028
1029 void execute_node(const FunctionNode &node,
1030 NodeState &node_state,
1031 CurrentTask &current_task,
1032 const LocalData &local_data);
1033
1034 void set_input_unused_during_execution(const Node &node,
1035 NodeState &node_state,
1036 const int input_index,
1037 CurrentTask &current_task,
1038 const LocalData &local_data)
1039 {
1040 const InputSocket &input_socket = node.input(input_index);
1041 this->with_locked_node(
1042 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1043 this->set_input_unused(locked_node, input_socket);
1044 });
1045 }
1046
1047 void set_input_unused(LockedNode &locked_node, const InputSocket &input_socket)
1048 {
1049 NodeState &node_state = locked_node.node_state;
1050 const int input_index = input_socket.index();
1051 InputState &input_state = node_state.inputs[input_index];
1052
1053 BLI_assert(input_state.usage != ValueUsage::Used);
1054 if (input_state.usage == ValueUsage::Unused) {
1055 return;
1056 }
1057 input_state.usage = ValueUsage::Unused;
1058
1059 this->destruct_input_value_if_exists(input_state, input_socket.type());
1060 if (input_state.was_ready_for_execution) {
1061 return;
1062 }
1063 const OutputSocket *origin = input_socket.origin();
1064 if (origin != nullptr) {
1065 locked_node.delayed_unused_outputs.append(origin);
1066 }
1067 }
1068
1069 void *set_input_required_during_execution(const Node &node,
1070 NodeState &node_state,
1071 const int input_index,
1072 CurrentTask &current_task,
1073 const LocalData &local_data)
1074 {
1075 const InputSocket &input_socket = node.input(input_index);
1076 void *result;
1077 this->with_locked_node(
1078 node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1079 result = this->set_input_required(locked_node, input_socket);
1080 });
1081 return result;
1082 }
1083
1084 void *set_input_required(LockedNode &locked_node, const InputSocket &input_socket)
1085 {
1086 BLI_assert(&locked_node.node == &input_socket.node());
1087 NodeState &node_state = locked_node.node_state;
1088 const int input_index = input_socket.index();
1089 InputState &input_state = node_state.inputs[input_index];
1090
1091 BLI_assert(input_state.usage != ValueUsage::Unused);
1092
1093 if (input_state.value != nullptr) {
1094 input_state.was_ready_for_execution = true;
1095 return input_state.value;
1096 }
1097 if (input_state.usage == ValueUsage::Used) {
1098 return nullptr;
1099 }
1100 input_state.usage = ValueUsage::Used;
1101 node_state.missing_required_inputs += 1;
1102
1103 const OutputSocket *origin_socket = input_socket.origin();
1104 /* Unlinked inputs are always loaded in advance. */
1105 BLI_assert(origin_socket != nullptr);
1106 locked_node.delayed_required_outputs.append(origin_socket);
1107 return nullptr;
1108 }
1109
1110 void forward_value_to_linked_inputs(const OutputSocket &from_socket,
1111 GMutablePointer value_to_forward,
1112 CurrentTask &current_task,
1113 const LocalData &local_data)
1114 {
1115 BLI_assert(value_to_forward.get() != nullptr);
1116 const CPPType &type = *value_to_forward.type();
1117 const Context local_context{
1118 context_->storage, context_->user_data, local_data.local_user_data};
1119
1120 if (self_.logger_ != nullptr) {
1121 self_.logger_->log_socket_value(from_socket, value_to_forward, local_context);
1122 }
1123
1124 const Span<const InputSocket *> targets = from_socket.targets();
1125 for (const InputSocket *target_socket : targets) {
1126 const Node &target_node = target_socket->node();
1127 NodeState &node_state = *node_states_[target_node.index_in_graph()];
1128 const int input_index = target_socket->index();
1129 InputState &input_state = node_state.inputs[input_index];
1130 const bool is_last_target = target_socket == targets.last();
1131#ifndef NDEBUG
1132 if (input_state.value != nullptr) {
1133 if (self_.logger_ != nullptr) {
1134 self_.logger_->dump_when_input_is_set_twice(*target_socket, from_socket, local_context);
1135 }
1137 }
1138#endif
1139 BLI_assert(!input_state.was_ready_for_execution);
1140 BLI_assert(target_socket->type() == type);
1141 BLI_assert(target_socket->origin() == &from_socket);
1142
1143 if (self_.logger_ != nullptr) {
1144 self_.logger_->log_socket_value(*target_socket, value_to_forward, local_context);
1145 }
1146 if (target_node.is_interface()) {
1147 /* Forward the value to the outside of the graph. */
1148 const int graph_output_index =
1149 self_.graph_output_index_by_socket_index_[target_socket->index()];
1150 if (graph_output_index != -1 &&
1151 params_->get_output_usage(graph_output_index) != ValueUsage::Unused)
1152 {
1153 void *dst_buffer = params_->get_output_data_ptr(graph_output_index);
1154 if (is_last_target) {
1155 type.move_construct(value_to_forward.get(), dst_buffer);
1156 }
1157 else {
1158 type.copy_construct(value_to_forward.get(), dst_buffer);
1159 }
1160 params_->output_set(graph_output_index);
1161 }
1162 continue;
1163 }
1164 this->with_locked_node(
1165 target_node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
1166 if (input_state.usage == ValueUsage::Unused) {
1167 return;
1168 }
1169 if (is_last_target) {
1170 /* No need to make a copy if this is the last target. */
1171 this->forward_value_to_input(
1172 locked_node, input_state, value_to_forward, current_task);
1173 value_to_forward = {};
1174 }
1175 else {
1176 void *buffer = local_data.allocator->allocate(type.size(), type.alignment());
1177 type.copy_construct(value_to_forward.get(), buffer);
1178 this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
1179 }
1180 });
1181 }
1182 if (value_to_forward.get() != nullptr) {
1183 value_to_forward.destruct();
1184 }
1185 }
1186
1187 void forward_value_to_input(LockedNode &locked_node,
1188 InputState &input_state,
1189 GMutablePointer value,
1190 CurrentTask &current_task)
1191 {
1192 NodeState &node_state = locked_node.node_state;
1193
1194 BLI_assert(input_state.value == nullptr);
1195 BLI_assert(!input_state.was_ready_for_execution);
1196 input_state.value = value.get();
1197
1198 if (input_state.usage == ValueUsage::Used) {
1199 node_state.missing_required_inputs -= 1;
1200 if (node_state.missing_required_inputs == 0 ||
1201 (locked_node.node.is_function() && static_cast<const FunctionNode &>(locked_node.node)
1202 .function()
1203 .allow_missing_requested_inputs()))
1204 {
1205 this->schedule_node(locked_node, current_task, false);
1206 }
1207 }
1208 }
1209
1210 bool use_multi_threading() const
1211 {
1212 return task_pool_.load() != nullptr;
1213 }
1214
1215 bool try_enable_multi_threading()
1216 {
1217#ifndef WITH_TBB
1218 /* The non-TBB task pool has the property that it immediately executes tasks under some
1219 * circumstances. This is not supported here because tasks might be scheduled while another
1220 * node is in the middle of being executed on the same thread. */
1221 return false;
1222#endif
1223 if (this->use_multi_threading()) {
1224 return true;
1225 }
1226#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
1227 /* Only the current main thread is allowed to enabled multi-threading, because the executor is
1228 * still in single-threaded mode. */
1229 if (current_main_thread_ != std::this_thread::get_id()) {
1231 }
1232#endif
1233 /* Check of the caller supports multi-threading. */
1234 if (!params_->try_enable_multi_threading()) {
1235 return false;
1236 }
1237 /* Avoid using multiple threads when only one thread can be used anyway. */
1238 if (BLI_system_thread_count() <= 1) {
1239 return false;
1240 }
1241 this->ensure_thread_locals();
1242 task_pool_.store(BLI_task_pool_create(this, TASK_PRIORITY_HIGH));
1243 return true;
1244 }
1245
1246 void ensure_thread_locals()
1247 {
1248#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
1249 if (current_main_thread_ != std::this_thread::get_id()) {
1251 }
1252#endif
1253 if (!thread_locals_) {
1254 thread_locals_ = std::make_unique<threading::EnumerableThreadSpecific<ThreadLocalStorage>>();
1255 }
1256 }
1257
1261 void push_all_scheduled_nodes_to_task_pool(CurrentTask &current_task)
1262 {
1263 BLI_assert(this->use_multi_threading());
1264 std::unique_ptr<ScheduledNodes> scheduled_nodes = std::make_unique<ScheduledNodes>();
1265 {
1266 std::lock_guard lock{current_task.mutex};
1267 if (current_task.scheduled_nodes.is_empty()) {
1268 return;
1269 }
1270 *scheduled_nodes = std::move(current_task.scheduled_nodes);
1271 current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
1272 }
1273 this->push_to_task_pool(std::move(scheduled_nodes));
1274 }
1275
1276 void push_to_task_pool(std::unique_ptr<ScheduledNodes> scheduled_nodes)
1277 {
1278 /* All nodes are pushed as a single task in the pool. This avoids unnecessary threading
1279 * overhead when the nodes are fast to compute. */
1281 task_pool_.load(),
1282 [](TaskPool *pool, void *data) {
1283 Executor &executor = *static_cast<Executor *>(BLI_task_pool_user_data(pool));
1284 ScheduledNodes &scheduled_nodes = *static_cast<ScheduledNodes *>(data);
1285 CurrentTask new_current_task;
1286 new_current_task.scheduled_nodes = std::move(scheduled_nodes);
1287 new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
1288 const LocalData local_data = executor.get_local_data();
1289 executor.run_task(new_current_task, local_data);
1290 },
1291 scheduled_nodes.release(),
1292 true,
1293 [](TaskPool * /*pool*/, void *data) { delete static_cast<ScheduledNodes *>(data); });
1294 }
1295
1296 LocalData get_local_data()
1297 {
1298 if (!this->use_multi_threading()) {
1299 return {&main_allocator_, context_->local_user_data};
1300 }
1301 ThreadLocalStorage &local_storage = thread_locals_->local();
1302 if (!local_storage.local_user_data.has_value()) {
1303 local_storage.local_user_data = context_->user_data->get_local(local_storage.allocator);
1304 }
1305 return {&local_storage.allocator, local_storage.local_user_data->get()};
1306 }
1307};
1308
1309class GraphExecutorLFParams final : public Params {
1310 private:
1311 Executor &executor_;
1312 const Node &node_;
1313 NodeState &node_state_;
1314 CurrentTask &current_task_;
1316 const Executor::LocalData &caller_local_data_;
1317
1318 public:
1320 Executor &executor,
1321 const Node &node,
1322 NodeState &node_state,
1323 CurrentTask &current_task,
1324 const Executor::LocalData &local_data)
1325 : Params(fn, node_state.enabled_multi_threading),
1326 executor_(executor),
1327 node_(node),
1328 node_state_(node_state),
1329 current_task_(current_task),
1330 caller_local_data_(local_data)
1331 {
1332 }
1333
1334 private:
1335 Executor::LocalData get_local_data()
1336 {
1337 if (!node_state_.enabled_multi_threading) {
1338 /* Can use the data from the thread-local data from the calling thread. */
1339 return caller_local_data_;
1340 }
1341 /* Need to retrieve the thread-local data for the current thread. */
1342 return executor_.get_local_data();
1343 }
1344
1345 void *try_get_input_data_ptr_impl(const int index) const override
1346 {
1347 const InputState &input_state = node_state_.inputs[index];
1348 if (input_state.was_ready_for_execution) {
1349 return input_state.value;
1350 }
1351 return nullptr;
1352 }
1353
1354 void *try_get_input_data_ptr_or_request_impl(const int index) override
1355 {
1356 const InputState &input_state = node_state_.inputs[index];
1357 if (input_state.was_ready_for_execution) {
1358 return input_state.value;
1359 }
1360 return executor_.set_input_required_during_execution(
1361 node_, node_state_, index, current_task_, this->get_local_data());
1362 }
1363
1364 void *get_output_data_ptr_impl(const int index) override
1365 {
1366 OutputState &output_state = node_state_.outputs[index];
1367 BLI_assert(!output_state.has_been_computed);
1368 if (output_state.value == nullptr) {
1369 LinearAllocator<> &allocator = *this->get_local_data().allocator;
1370 const CPPType &type = node_.output(index).type();
1371 output_state.value = allocator.allocate(type.size(), type.alignment());
1372 }
1373 return output_state.value;
1374 }
1375
1376 void output_set_impl(const int index) override
1377 {
1378 OutputState &output_state = node_state_.outputs[index];
1379 BLI_assert(!output_state.has_been_computed);
1380 BLI_assert(output_state.value != nullptr);
1381 const OutputSocket &output_socket = node_.output(index);
1382 executor_.forward_value_to_linked_inputs(output_socket,
1383 {output_socket.type(), output_state.value},
1384 current_task_,
1385 this->get_local_data());
1386 output_state.value = nullptr;
1387 output_state.has_been_computed = true;
1388 }
1389
1390 bool output_was_set_impl(const int index) const override
1391 {
1392 const OutputState &output_state = node_state_.outputs[index];
1393 return output_state.has_been_computed;
1394 }
1395
1396 ValueUsage get_output_usage_impl(const int index) const override
1397 {
1398 const OutputState &output_state = node_state_.outputs[index];
1399 return output_state.usage_for_execution;
1400 }
1401
1402 void set_input_unused_impl(const int index) override
1403 {
1404 executor_.set_input_unused_during_execution(
1405 node_, node_state_, index, current_task_, this->get_local_data());
1406 }
1407
1408 bool try_enable_multi_threading_impl() override
1409 {
1410 const bool success = executor_.try_enable_multi_threading();
1411 if (success) {
1412 node_state_.enabled_multi_threading = true;
1413 }
1414 return success;
1415 }
1416};
1417
1423inline void Executor::execute_node(const FunctionNode &node,
1424 NodeState &node_state,
1425 CurrentTask &current_task,
1426 const LocalData &local_data)
1427{
1428 const LazyFunction &fn = node.function();
1429 GraphExecutorLFParams node_params{fn, *this, node, node_state, current_task, local_data};
1430
1431 Context fn_context(node_state.storage, context_->user_data, local_data.local_user_data);
1432
1433 if (self_.logger_ != nullptr) {
1434 self_.logger_->log_before_node_execute(node, node_params, fn_context);
1435 }
1436
1437 /* This is run when the execution of the node calls `lazy_threading::send_hint` to indicate that
1438 * the execution will take a while. In this case, other tasks waiting on this thread should be
1439 * allowed to be picked up by another thread. */
1440 auto blocking_hint_fn = [&]() {
1441 if (!current_task.has_scheduled_nodes.load()) {
1442 return;
1443 }
1444 if (!this->try_enable_multi_threading()) {
1445 return;
1446 }
1447 this->push_all_scheduled_nodes_to_task_pool(current_task);
1448 };
1449
1450 lazy_threading::HintReceiver blocking_hint_receiver{blocking_hint_fn};
1451 if (self_.node_execute_wrapper_) {
1452 self_.node_execute_wrapper_->execute_node(node, node_params, fn_context);
1453 }
1454 else {
1455 fn.execute(node_params, fn_context);
1456 }
1457
1458 if (self_.logger_ != nullptr) {
1459 self_.logger_->log_after_node_execute(node, node_params, fn_context);
1460 }
1461}
1462
1463GraphExecutor::GraphExecutor(const Graph &graph,
1466 const Logger *logger,
1467 const SideEffectProvider *side_effect_provider,
1468 const NodeExecuteWrapper *node_execute_wrapper)
1469 : graph_(graph),
1470 graph_inputs_(std::move(graph_inputs)),
1471 graph_outputs_(std::move(graph_outputs)),
1472 graph_input_index_by_socket_index_(graph.graph_inputs().size(), -1),
1473 graph_output_index_by_socket_index_(graph.graph_outputs().size(), -1),
1474 logger_(logger),
1475 side_effect_provider_(side_effect_provider),
1476 node_execute_wrapper_(node_execute_wrapper)
1477{
1478 debug_name_ = graph.name().c_str();
1479
1480 /* The graph executor can handle partial execution when there are still missing inputs. */
1482
1483 for (const int i : graph_inputs_.index_range()) {
1484 const OutputSocket &socket = *graph_inputs_[i];
1485 BLI_assert(socket.node().is_interface());
1486 inputs_.append({"In", socket.type(), ValueUsage::Maybe});
1487 graph_input_index_by_socket_index_[socket.index()] = i;
1488 }
1489 for (const int i : graph_outputs_.index_range()) {
1490 const InputSocket &socket = *graph_outputs_[i];
1491 BLI_assert(socket.node().is_interface());
1492 outputs_.append({"Out", socket.type()});
1493 graph_output_index_by_socket_index_[socket.index()] = i;
1494 }
1495
1496 /* Preprocess buffer offsets. */
1497 int offset = 0;
1498 const Span<const Node *> nodes = graph_.nodes();
1499 init_buffer_info_.node_states_array_offset = offset;
1500 offset += sizeof(NodeState *) * nodes.size();
1501 init_buffer_info_.loaded_inputs_array_offset = offset;
1502 offset += sizeof(std::atomic<uint8_t>) * graph_inputs_.size();
1503 /* Align offset. */
1504 offset = (offset + sizeof(void *) - 1) & ~(sizeof(void *) - 1);
1505
1506 init_buffer_info_.node_states_offsets.reinitialize(graph_.nodes().size());
1507 for (const int i : nodes.index_range()) {
1508 const Node &node = *nodes[i];
1509 init_buffer_info_.node_states_offsets[i] = offset;
1510 offset += sizeof(NodeState);
1511 offset += sizeof(InputState) * node.inputs().size();
1512 offset += sizeof(OutputState) * node.outputs().size();
1513 /* Make sure we don't have to worry about alignment. */
1514 static_assert(sizeof(NodeState) % sizeof(void *) == 0);
1515 static_assert(sizeof(InputState) % sizeof(void *) == 0);
1516 static_assert(sizeof(OutputState) % sizeof(void *) == 0);
1517 }
1518
1519 init_buffer_info_.total_size = offset;
1520}
1521
1522void GraphExecutor::execute_impl(Params &params, const Context &context) const
1523{
1524 Executor &executor = *static_cast<Executor *>(context.storage);
1525 executor.execute(params, context);
1526}
1527
1529{
1530 Executor &executor = *allocator.construct<Executor>(*this).release();
1531 return &executor;
1532}
1533
1534void GraphExecutor::destruct_storage(void *storage) const
1535{
1536 std::destroy_at(static_cast<Executor *>(storage));
1537}
1538
1539std::string GraphExecutor::input_name(const int index) const
1540{
1541 const lf::OutputSocket &socket = *graph_inputs_[index];
1542 return socket.name();
1543}
1544
1545std::string GraphExecutor::output_name(const int index) const
1546{
1547 const lf::InputSocket &socket = *graph_outputs_[index];
1548 return socket.name();
1549}
1550
1552 const GPointer value,
1553 const Context &context) const
1554{
1555 UNUSED_VARS(socket, value, context);
1556}
1557
1559 const Params &params,
1560 const Context &context) const
1561{
1562 UNUSED_VARS(node, params, context);
1563}
1564
1566 const Params &params,
1567 const Context &context) const
1568{
1569 UNUSED_VARS(node, params, context);
1570}
1571
1573 const Context &context) const
1574{
1575 UNUSED_VARS(context);
1576 return {};
1577}
1578
1580 Span<const OutputSocket *> missing_sockets,
1581 const Context &context) const
1582{
1583 UNUSED_VARS(node, missing_sockets, context);
1584}
1585
1587 const OutputSocket &from_socket,
1588 const Context &context) const
1589{
1590 UNUSED_VARS(target_socket, from_socket, context);
1591}
1592
1593} // namespace blender::fn::lazy_function
#define BLI_assert_unreachable()
Definition BLI_assert.h:97
#define BLI_assert(a)
Definition BLI_assert.h:50
#define BLI_SCOPED_DEFER(function_to_defer)
@ TASK_PRIORITY_HIGH
Definition BLI_task.h:57
void BLI_task_pool_work_and_wait(TaskPool *pool)
Definition task_pool.cc:471
TaskPool * BLI_task_pool_create(void *userdata, eTaskPriority priority)
Definition task_pool.cc:394
void BLI_task_pool_free(TaskPool *pool)
Definition task_pool.cc:431
void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskFreeFunction freedata)
Definition task_pool.cc:450
int BLI_system_thread_count(void)
Definition threads.cc:253
#define UNUSED_VARS(...)
volatile int lock
PyObject * self
destruct_ptr< T > construct(Args &&...args)
void * allocate(const int64_t size, const int64_t alignment)
constexpr int64_t size() const
Definition BLI_span.hh:494
constexpr IndexRange index_range() const
Definition BLI_span.hh:671
int64_t size() const
void append(const T &value)
bool is_empty() const
void resize(const int64_t new_size)
Span< T > as_span() const
void execute(Params &params, const Context &context)
GraphExecutorLFParams(const LazyFunction &fn, Executor &executor, const Node &node, NodeState &node_state, CurrentTask &current_task, const Executor::LocalData &local_data)
virtual void log_before_node_execute(const FunctionNode &node, const Params &params, const Context &context) const
virtual void dump_when_outputs_are_missing(const FunctionNode &node, Span< const OutputSocket * > missing_sockets, const Context &context) const
virtual void log_after_node_execute(const FunctionNode &node, const Params &params, const Context &context) const
virtual void log_socket_value(const Socket &socket, GPointer value, const Context &context) const
virtual void dump_when_input_is_set_twice(const InputSocket &target_socket, const OutputSocket &from_socket, const Context &context) const
virtual Vector< const FunctionNode * > get_nodes_with_side_effects(const Context &context) const
void * init_storage(LinearAllocator<> &allocator) const override
std::string output_name(int index) const override
const OutputSocket & output(int index) const
OperationNode * node
TaskPool * task_pool
uiWidgetBaseParameters params[MAX_WIDGET_BASE_BATCH]
static Local & get_local_data()
void parallel_for(const IndexRange range, const int64_t grain_size, const Function &function, const TaskSizeHints &size_hints=detail::TaskSizeHints_Static(1))
Definition BLI_task.hh:95
__int64 int64_t
Definition stdint.h:89
unsigned char uint8_t
Definition stdint.h:78
const NodeType * type
Definition graph/node.h:178
Vector< const OutputSocket * > delayed_required_outputs
LockedNode(const Node &node, NodeState &node_state)
void schedule(const FunctionNode &node, const bool is_priority)